From 4e987fd64f73abf6c6a13dc5bb594baabfa9f34a Mon Sep 17 00:00:00 2001 From: Daniel Clayton Date: Fri, 4 Nov 2022 14:54:16 -0700 Subject: [PATCH] switch to tty input --- Makefile | 4 +- aws_okta_processor/core/fetcher.py | 2 +- aws_okta_processor/core/okta.py | 14 +- aws_okta_processor/core/prompt.py | 4 +- aws_okta_processor/core/saml.py | 2 +- .../core/{print_tty.py => tty.py} | 53 ++++++- tests/core/test_fetcher.py | 6 +- tests/core/test_okta.py | 10 +- tests/core/test_prompt.py | 10 +- tests/core/{test_print_tty.py => test_tty.py} | 131 ++++++++++++------ 10 files changed, 165 insertions(+), 71 deletions(-) rename aws_okta_processor/core/{print_tty.py => tty.py} (68%) rename tests/core/{test_print_tty.py => test_tty.py} (61%) diff --git a/Makefile b/Makefile index 1406a22..3fb052e 100644 --- a/Makefile +++ b/Makefile @@ -14,8 +14,10 @@ flake8: coverage: PYTHONPATH=$(PYTHONPATH) pipenv run py.test --cov-config .coveragerc --verbose --cov-report term --cov-report xml --cov-report html --cov=aws_okta_processor tests -publish: +package: python setup.py sdist bdist_wheel + +publish: package twine check dist/* twine upload dist/* rm -fr build dist .egg requests.egg-info diff --git a/aws_okta_processor/core/fetcher.py b/aws_okta_processor/core/fetcher.py index 26ea223..d9474d8 100644 --- a/aws_okta_processor/core/fetcher.py +++ b/aws_okta_processor/core/fetcher.py @@ -8,7 +8,7 @@ from hashlib import sha1 from aws_okta_processor.core.okta import Okta from botocore.credentials import CachedCredentialFetcher -from aws_okta_processor.core.print_tty import print_tty +from aws_okta_processor.core.tty import print_tty class SAMLFetcher(CachedCredentialFetcher): diff --git a/aws_okta_processor/core/okta.py b/aws_okta_processor/core/okta.py index bfb70d0..46dd433 100644 --- a/aws_okta_processor/core/okta.py +++ b/aws_okta_processor/core/okta.py @@ -14,7 +14,7 @@ from requests import ConnectTimeout from requests import ConnectionError from collections import OrderedDict -from aws_okta_processor.core.print_tty import print_tty +from aws_okta_processor.core.tty import print_tty, input_tty from six import add_metaclass @@ -73,23 +73,23 @@ def __init__( if not self.organization: print_tty(string="Organization: ", newline=False) - self.organization = input() + self.organization = input_tty() if not self.user_name: print_tty(string="UserName: ", newline=False) - self.user_name = input() + self.user_name = input_tty() if not self.okta_session_id: if not self.user_name: print_tty(string="UserName: ", newline=False) - self.user_name = input() + self.user_name = input_tty() if not user_pass: user_pass = getpass.getpass() if not self.organization: print_tty(string="Organization: ", newline=False) - self.organization = input() + self.organization = input_tty() self.okta_single_use_token = self.get_okta_single_use_token( user_name=self.user_name, @@ -450,7 +450,7 @@ def __init__(self, link=None): @staticmethod def payload(): print_tty("Token: ", newline=False) - return {"passCode": input()} + return {"passCode": input_tty()} def retry(self, response): return False @@ -465,7 +465,7 @@ def __init__(self, link=None): @staticmethod def payload(): print_tty("Hardware Token: ", newline=False) - return {"passCode": input()} + return {"passCode": input_tty()} def retry(self, response): return False diff --git a/aws_okta_processor/core/prompt.py b/aws_okta_processor/core/prompt.py index b32bcbc..47ad788 100644 --- a/aws_okta_processor/core/prompt.py +++ b/aws_okta_processor/core/prompt.py @@ -2,7 +2,7 @@ import six from collections.abc import Mapping -from aws_okta_processor.core.print_tty import print_tty +from aws_okta_processor.core.tty import print_tty, input_tty BAD_INPUT_MESSAGE = "WARNING: Please supply a value from 1 to {}!" @@ -66,7 +66,7 @@ def get_selection(options=None): print_tty("Selection: ", newline=False) try: - selection = input() + selection = input_tty() except KeyboardInterrupt: print_tty() sys.exit(1) diff --git a/aws_okta_processor/core/saml.py b/aws_okta_processor/core/saml.py index 3583b53..91968d4 100644 --- a/aws_okta_processor/core/saml.py +++ b/aws_okta_processor/core/saml.py @@ -8,7 +8,7 @@ from bs4 import BeautifulSoup from collections import OrderedDict -from aws_okta_processor.core.print_tty import print_tty +from aws_okta_processor.core.tty import print_tty SAML_ATTRIBUTE = '{urn:oasis:names:tc:SAML:2.0:assertion}Attribute' diff --git a/aws_okta_processor/core/print_tty.py b/aws_okta_processor/core/tty.py similarity index 68% rename from aws_okta_processor/core/print_tty.py rename to aws_okta_processor/core/tty.py index ee1a914..8da8ca4 100644 --- a/aws_okta_processor/core/print_tty.py +++ b/aws_okta_processor/core/tty.py @@ -8,6 +8,54 @@ import sys +def import_msvcrt(): + import msvcrt + return msvcrt + + +def input_tty(): + try: + msvcrt = import_msvcrt() + except ImportError: + return unix_input_tty() + else: + return win_input_tty(msvcrt) + + +def unix_input_tty(): + with contextlib2.ExitStack() as stack: + try: + fd = os.open('/dev/tty', os.O_RDWR | os.O_NOCTTY) + tty = io.FileIO(fd, 'r+') + stack.enter_context(tty) + input = io.TextIOWrapper(tty) + stack.enter_context(input) + except OSError: + stack.close() + input = sys.stdin + + line = input.readline() + if line[-1] == '\n': + line = line[:-1] + return line + + +def win_input_tty(msvcrt): + pw = "" + while 1: + c = msvcrt.getwch() + if c == '\r' or c == '\n': + break + if c == '\003': + raise KeyboardInterrupt + if c == '\b': + pw = pw[:-1] + else: + pw = pw + c + + return pw + + def unix_print_tty(string='', indents=0, newline=True): with contextlib2.ExitStack() as stack: string = indent(indents) + string @@ -65,11 +113,6 @@ def indent(indents=None): return indent -def import_msvcrt(): - import msvcrt - return msvcrt - - def print_tty(string='', indents=0, newline=True, silent=False): try: msvcrt = import_msvcrt() diff --git a/tests/core/test_fetcher.py b/tests/core/test_fetcher.py index a957c23..234617d 100644 --- a/tests/core/test_fetcher.py +++ b/tests/core/test_fetcher.py @@ -49,7 +49,7 @@ def test_get_app_roles(self, mock_get_app_roles): @patch("boto3.client") @patch('aws_okta_processor.core.fetcher.print_tty') @patch('aws_okta_processor.core.fetcher.prompt.print_tty') - @patch('aws_okta_processor.core.fetcher.prompt.input', return_value='1') + @patch('aws_okta_processor.core.fetcher.prompt.input_tty', return_value='1') @patch('aws_okta_processor.core.fetcher.Okta') def test_fetcher_should_filter_accounts( self, @@ -104,7 +104,7 @@ def assume_role_side_effect(*args, **kwargs): @patch("boto3.client") @patch('aws_okta_processor.core.fetcher.print_tty') @patch('aws_okta_processor.core.fetcher.prompt.print_tty') - @patch('aws_okta_processor.core.fetcher.prompt.input', return_value='1') + @patch('aws_okta_processor.core.fetcher.prompt.input_tty', return_value='1') @patch('aws_okta_processor.core.fetcher.Okta') def test_fetcher_should_prompt_all_accounts( self, @@ -155,4 +155,4 @@ def assume_role_side_effect(*args, **kwargs): call('Account: 2', indents=0), call('[ 3 ] Role-One', indents=1), call('Selection: ', newline=False) - ]) \ No newline at end of file + ]) diff --git a/tests/core/test_okta.py b/tests/core/test_okta.py index f40c70d..49527cf 100644 --- a/tests/core/test_okta.py +++ b/tests/core/test_okta.py @@ -252,7 +252,7 @@ def test_okta_mfa_push_challenge( self.assertEqual(okta.organization, "organization.okta.com") self.assertEqual(okta.okta_session_id, "session_token") - @patch('aws_okta_processor.core.okta.input') + @patch('aws_okta_processor.core.okta.input_tty') @patch('aws_okta_processor.core.okta.os.chmod') @patch('aws_okta_processor.core.okta.open') @patch('aws_okta_processor.core.okta.os.makedirs') @@ -298,7 +298,7 @@ def test_okta_mfa_totp_challenge( @patch('aws_okta_processor.core.okta.Okta.get_okta_single_use_token') @patch('aws_okta_processor.core.okta.Okta.create_and_store_okta_session') - @patch('aws_okta_processor.core.okta.input') + @patch('aws_okta_processor.core.okta.input_tty') def test_read_aop_from_okta_session_should_read_aop_options( self, mock_input, @@ -326,7 +326,7 @@ def test_read_aop_from_okta_session_should_read_aop_options( @patch('aws_okta_processor.core.okta.Okta.get_cache_file_path', return_value='/tmp/test.json') @patch('aws_okta_processor.core.okta.Okta.get_okta_single_use_token') @patch('aws_okta_processor.core.okta.Okta.create_and_store_okta_session') - @patch('aws_okta_processor.core.okta.input') + @patch('aws_okta_processor.core.okta.input_tty') @patch('builtins.open', new_callable=mock_open) def test_set_okta_session_should_write_session_data( self, @@ -368,7 +368,7 @@ def test_set_okta_session_should_write_session_data( call('}') ]) - @patch('aws_okta_processor.core.okta.input') + @patch('aws_okta_processor.core.okta.input_tty') @patch('aws_okta_processor.core.okta.os.chmod') @patch('aws_okta_processor.core.okta.open') @patch('aws_okta_processor.core.okta.os.makedirs') @@ -412,7 +412,7 @@ def test_okta_mfa_hardware_token_challenge( self.assertEqual(okta.organization, "organization.okta.com") self.assertEqual(okta.okta_session_id, "session_token") - @patch('aws_okta_processor.core.prompt.input') + @patch('aws_okta_processor.core.prompt.input_tty') @patch('aws_okta_processor.core.okta.os.chmod') @patch('aws_okta_processor.core.okta.open') @patch('aws_okta_processor.core.okta.os.makedirs') diff --git a/tests/core/test_prompt.py b/tests/core/test_prompt.py index a890034..2832e9b 100644 --- a/tests/core/test_prompt.py +++ b/tests/core/test_prompt.py @@ -65,7 +65,7 @@ def test_get_item_config_no_match(self, mock_sys, mock_print_tty): ) @patch('aws_okta_processor.core.prompt.print_tty') - @patch('aws_okta_processor.core.prompt.input') + @patch('aws_okta_processor.core.prompt.input_tty') def test_get_selection(self, mock_input, mock_print_tty): mock_input.return_value = 1 options = ["one", "two"] @@ -77,7 +77,7 @@ def test_get_selection(self, mock_input, mock_print_tty): ) @patch('aws_okta_processor.core.prompt.print_tty') - @patch('aws_okta_processor.core.prompt.input') + @patch('aws_okta_processor.core.prompt.input_tty') def test_get_selection_bad_input(self, mock_input, mock_print_tty): mock_input.side_effect = ["bad_input", 2] options = ["one", "two"] @@ -89,7 +89,7 @@ def test_get_selection_bad_input(self, mock_input, mock_print_tty): ) @patch('aws_okta_processor.core.prompt.print_tty') - @patch('aws_okta_processor.core.prompt.input') + @patch('aws_okta_processor.core.prompt.input_tty') def test_get_selection_bad_int(self, mock_input, mock_print_tty): mock_input.side_effect = [0, 2] options = ["one", "two"] @@ -101,7 +101,7 @@ def test_get_selection_bad_int(self, mock_input, mock_print_tty): ) @patch('aws_okta_processor.core.prompt.print_tty') - @patch('aws_okta_processor.core.prompt.input') + @patch('aws_okta_processor.core.prompt.input_tty') def test_get_selection_no_input(self, mock_input, mock_print_tty): mock_input.side_effect = [SyntaxError, 2] options = ["one", "two"] @@ -114,7 +114,7 @@ def test_get_selection_no_input(self, mock_input, mock_print_tty): @patch('aws_okta_processor.core.prompt.print_tty') @patch('aws_okta_processor.core.prompt.sys') - @patch('aws_okta_processor.core.prompt.input') + @patch('aws_okta_processor.core.prompt.input_tty') def test_get_selection_interrupt(self, mock_input, mock_sys, mock_print_tty): # noqa mock_input.side_effect = [KeyboardInterrupt, 2] options = ["one"] diff --git a/tests/core/test_print_tty.py b/tests/core/test_tty.py similarity index 61% rename from tests/core/test_print_tty.py rename to tests/core/test_tty.py index 750c0f6..f3ad77d 100644 --- a/tests/core/test_print_tty.py +++ b/tests/core/test_tty.py @@ -1,16 +1,20 @@ +import os +import sys +import unittest + from unittest import TestCase from mock import patch from mock import call from mock import MagicMock -import aws_okta_processor.core.print_tty as print_tty +import aws_okta_processor.core.tty as tty -class Test(TestCase): - @patch('aws_okta_processor.core.print_tty.import_msvcrt') - @patch('aws_okta_processor.core.print_tty.contextlib2') - @patch('aws_okta_processor.core.print_tty.os') - @patch('aws_okta_processor.core.print_tty.io') +class UnixTtyTests(TestCase): + @patch('aws_okta_processor.core.tty.import_msvcrt') + @patch('aws_okta_processor.core.tty.contextlib2') + @patch('aws_okta_processor.core.tty.os') + @patch('aws_okta_processor.core.tty.io') def test_unix_print_tty( self, mock_io, @@ -29,14 +33,14 @@ def test_unix_print_tty( call(u'\n') ] - print_tty.print_tty("STRING") + tty.print_tty("STRING") mock_text_wrapper.write.assert_has_calls(calls) mock_os.open.assert_called_once() - @patch('aws_okta_processor.core.print_tty.import_msvcrt') - @patch('aws_okta_processor.core.print_tty.contextlib2') - @patch('aws_okta_processor.core.print_tty.os') - @patch('aws_okta_processor.core.print_tty.io') + @patch('aws_okta_processor.core.tty.import_msvcrt') + @patch('aws_okta_processor.core.tty.contextlib2') + @patch('aws_okta_processor.core.tty.os') + @patch('aws_okta_processor.core.tty.io') def test_unix_print_tty_no_newline( self, mock_io, @@ -50,16 +54,16 @@ def test_unix_print_tty_no_newline( mock_text_wrapper = MagicMock() mock_io.TextIOWrapper.return_value = mock_text_wrapper - print_tty.print_tty("STRING", newline=False) + tty.print_tty("STRING", newline=False) mock_os.open.assert_called_once() mock_stack.enter_context.called_once_with(mock_text_wrapper) mock_text_wrapper.write.assert_called_once_with(u'STRING') mock_text_wrapper.flush.assert_called_once() - @patch('aws_okta_processor.core.print_tty.import_msvcrt') - @patch('aws_okta_processor.core.print_tty.contextlib2') - @patch('aws_okta_processor.core.print_tty.os') - @patch('aws_okta_processor.core.print_tty.io') + @patch('aws_okta_processor.core.tty.import_msvcrt') + @patch('aws_okta_processor.core.tty.contextlib2') + @patch('aws_okta_processor.core.tty.os') + @patch('aws_okta_processor.core.tty.io') def test_unix_print_tty_indent( self, mock_io, @@ -73,17 +77,17 @@ def test_unix_print_tty_indent( mock_text_wrapper = MagicMock() mock_io.TextIOWrapper.return_value = mock_text_wrapper - print_tty.print_tty("STRING", indents=1, newline=False) + tty.print_tty("STRING", indents=1, newline=False) mock_os.open.assert_called_once() mock_stack.enter_context.called_once_with(mock_text_wrapper) mock_text_wrapper.write.assert_called_once_with(u' STRING') mock_text_wrapper.flush.assert_called_once() - @patch('aws_okta_processor.core.print_tty.import_msvcrt') - @patch('aws_okta_processor.core.print_tty.sys.stdout') - @patch('aws_okta_processor.core.print_tty.contextlib2') - @patch('aws_okta_processor.core.print_tty.os') - @patch('aws_okta_processor.core.print_tty.io') + @patch('aws_okta_processor.core.tty.import_msvcrt') + @patch('aws_okta_processor.core.tty.sys.stdout') + @patch('aws_okta_processor.core.tty.contextlib2') + @patch('aws_okta_processor.core.tty.os') + @patch('aws_okta_processor.core.tty.io') def test_unix_print_tty_print( self, mock_io, @@ -104,16 +108,16 @@ def test_unix_print_tty_print( call(u'\n') ] - print_tty.print_tty("STRING") + tty.print_tty("STRING") mock_print.write.assert_has_calls(calls) mock_stack.close.assert_called_once() mock_text_wrapper.write.assert_not_called() - @patch('aws_okta_processor.core.print_tty.import_msvcrt') - @patch('aws_okta_processor.core.print_tty.sys.stdout') - @patch('aws_okta_processor.core.print_tty.contextlib2') - @patch('aws_okta_processor.core.print_tty.os') - @patch('aws_okta_processor.core.print_tty.io') + @patch('aws_okta_processor.core.tty.import_msvcrt') + @patch('aws_okta_processor.core.tty.sys.stdout') + @patch('aws_okta_processor.core.tty.contextlib2') + @patch('aws_okta_processor.core.tty.os') + @patch('aws_okta_processor.core.tty.io') def test_unix_print_tty_print_no_newline( self, mock_io, @@ -129,16 +133,16 @@ def test_unix_print_tty_print_no_newline( mock_io.TextIOWrapper.return_value = mock_text_wrapper mock_os.open.side_effect = OSError - print_tty.print_tty("STRING", newline=False) + tty.print_tty("STRING", newline=False) mock_print.write.assert_called_once_with("STRING") mock_stack.close.assert_called_once() mock_text_wrapper.write.assert_not_called() - @patch('aws_okta_processor.core.print_tty.import_msvcrt') - @patch('aws_okta_processor.core.print_tty.sys.stdout') - @patch('aws_okta_processor.core.print_tty.contextlib2') - @patch('aws_okta_processor.core.print_tty.os') - @patch('aws_okta_processor.core.print_tty.io') + @patch('aws_okta_processor.core.tty.import_msvcrt') + @patch('aws_okta_processor.core.tty.sys.stdout') + @patch('aws_okta_processor.core.tty.contextlib2') + @patch('aws_okta_processor.core.tty.os') + @patch('aws_okta_processor.core.tty.io') def test_unix_print_tty_print_indent( self, mock_io, @@ -154,12 +158,48 @@ def test_unix_print_tty_print_indent( mock_io.TextIOWrapper.return_value = mock_text_wrapper mock_os.open.side_effect = OSError - print_tty.print_tty("STRING", indents=1, newline=False) + tty.print_tty("STRING", indents=1, newline=False) mock_print.write.assert_called_once_with(" STRING") mock_stack.close.assert_called_once() mock_text_wrapper.write.assert_not_called() - @patch('aws_okta_processor.core.print_tty.import_msvcrt') + @patch('aws_okta_processor.core.tty.import_msvcrt') + @patch('aws_okta_processor.core.tty.os') + @patch('aws_okta_processor.core.tty.io.FileIO') + @patch('aws_okta_processor.core.tty.io.TextIOWrapper') + def test_unix_input_tty( + self, + mock_textio, + mock_fileio, + mock_os, + mock_import_msvcrt + ): + mock_import_msvcrt.side_effect = ImportError + + tty.input_tty() + mock_fileio.assert_called_once_with(mock_os.open.return_value, 'r+') + mock_textio.assert_called_once_with(mock_fileio.return_value) + + @patch('aws_okta_processor.core.tty.import_msvcrt') + @patch('aws_okta_processor.core.tty.os') + @patch('aws_okta_processor.core.tty.sys.stdin.readline') + def test_unix_input_tty_input( + self, + mock_readline, + mock_os, + mock_import_msvcrt + ): + mock_import_msvcrt.side_effect = ImportError + mock_os.open.side_effect = IOError + mock_readline.return_value = 'return-value' + + actual = tty.input_tty() + + self.assertEqual(actual, 'return-value') + + +class WindowsTtyTests(unittest.TestCase): + @patch('aws_okta_processor.core.tty.import_msvcrt') def test_win_print_tty(self, mock_import_msvcrt): mock_msvcrt = MagicMock() mock_import_msvcrt.return_value = mock_msvcrt @@ -168,14 +208,14 @@ def test_win_print_tty(self, mock_import_msvcrt): calls[0].append(call(char)) calls[1].append(call(bytes(char.encode()))) - print_tty.print_tty("STRING") + tty.print_tty("STRING") try: mock_msvcrt.putch.assert_has_calls(calls[0]) except AssertionError: mock_msvcrt.putch.assert_has_calls(calls[1]) - @patch('aws_okta_processor.core.print_tty.import_msvcrt') + @patch('aws_okta_processor.core.tty.import_msvcrt') def test_win_print_tty_no_newline(self, mock_import_msvcrt): mock_msvcrt = MagicMock() mock_import_msvcrt.return_value = mock_msvcrt @@ -184,7 +224,7 @@ def test_win_print_tty_no_newline(self, mock_import_msvcrt): calls[0].append(call(char)) calls[1].append(call(bytes(char.encode()))) - print_tty.print_tty("STRING", newline=False) + tty.print_tty("STRING", newline=False) mock_import_msvcrt.assert_called() try: @@ -192,7 +232,7 @@ def test_win_print_tty_no_newline(self, mock_import_msvcrt): except AssertionError: mock_msvcrt.putch.assert_has_calls(calls[1]) - @patch('aws_okta_processor.core.print_tty.import_msvcrt') + @patch('aws_okta_processor.core.tty.import_msvcrt') def test_win_print_tty_indent(self, mock_import_msvcrt): mock_msvcrt = MagicMock() mock_import_msvcrt.return_value = mock_msvcrt @@ -201,10 +241,19 @@ def test_win_print_tty_indent(self, mock_import_msvcrt): calls[0].append(call(char)) calls[1].append(call(bytes(char.encode()))) - print_tty.print_tty("STRING", indents=1, newline=False) + tty.print_tty("STRING", indents=1, newline=False) mock_import_msvcrt.assert_called() try: mock_msvcrt.putch.assert_has_calls(calls[0]) except AssertionError: mock_msvcrt.putch.assert_has_calls(calls[1]) + + @patch('aws_okta_processor.core.tty.import_msvcrt') + def test_win_input_tty(self, mock_import_msvcrt): + mock_msvcrt = MagicMock() + mock_import_msvcrt.return_value = mock_msvcrt + mock_msvcrt.getwch.side_effect = ['a','b','c','\n'] + actual = tty.input_tty() + + self.assertEqual(actual, 'abc')