diff --git a/README.md b/README.md index d2e13e3..c088c66 100644 --- a/README.md +++ b/README.md @@ -1,6 +1,6 @@ -[![Apache](https://img.shields.io/badge/license-Apache-blue.svg)](https://github.com/nathan-v/aws_okta_keyman/blob/master/LICENSE.txt) [![PyPI version](https://badge.fury.io/py/aws-okta-keyman.svg)](https://badge.fury.io/py/aws-okta-keyman) [![Python versions](https://img.shields.io/pypi/pyversions/aws-okta-keyman.svg?style=flat-square)](https://pypi.python.org/pypi/aws-okta-keyman/0.2.0) +[![Apache](https://img.shields.io/badge/license-Apache-blue.svg)](https://github.com/nathan-v/aws_okta_keyman/blob/master/LICENSE.txt) [![PyPI version](https://badge.fury.io/py/aws-okta-keyman.svg)](https://badge.fury.io/py/aws-okta-keyman) [![Python versions](https://img.shields.io/pypi/pyversions/aws-okta-keyman.svg?style=flat-square)](https://pypi.python.org/pypi/aws-okta-keyman/0.2.0) [![Downloads](http://pepy.tech/badge/aws-okta-keyman)](http://pepy.tech/count/aws-okta-keyman) -[![CircleCI](https://circleci.com/gh/nathan-v/aws_okta_keyman.svg?style=svg&circle-token=93e91f099440edc9f62378bb3f056af8b0841231)](https://circleci.com/gh/nathan-v/aws_okta_keyman) [![CC GPA](https://codeclimate.com/github/nathan-v/aws_okta_keyman/badges/gpa.svg)](https://codeclimate.com/github/nathan-v/aws_okta_keyman) [![CC Issues](https://codeclimate.com/github/nathan-v/aws_okta_keyman/badges/issue_count.svg)](https://codeclimate.com/github/nathan-v/aws_okta_keyman) [![Coverage Status](https://coveralls.io/repos/github/nathan-v/aws_okta_keyman/badge.svg?branch=master)](https://coveralls.io/github/nathan-v/aws_okta_keyman?branch=master) +[![CircleCI](https://circleci.com/gh/nathan-v/aws_okta_keyman/tree/master.svg?style=svg&circle-token=93e91f099440edc9f62378bb3f056af8b0841231)](https://circleci.com/gh/nathan-v/aws_okta_keyman/tree/master) [![CC GPA](https://codeclimate.com/github/nathan-v/aws_okta_keyman/badges/gpa.svg)](https://codeclimate.com/github/nathan-v/aws_okta_keyman) [![CC Issues](https://codeclimate.com/github/nathan-v/aws_okta_keyman/badges/issue_count.svg)](https://codeclimate.com/github/nathan-v/aws_okta_keyman) [![Coverage Status](https://coveralls.io/repos/github/nathan-v/aws_okta_keyman/badge.svg?branch=master)](https://coveralls.io/github/nathan-v/aws_okta_keyman?branch=master) # AWS Okta Keyman @@ -31,6 +31,13 @@ In the case of Duo Auth a web page is opened (served locally) for the user to interact with Duo and select their preferred authentication method. Once Duo is successful the user may close the browser or tab. +### Supported MFA Solutions + +* Okta Verify +* Duo Auth +* Okta OTP +* Google Auth OTP + ## Multiple AWS Roles AWS Okta Keyman supports multiple AWS roles when configued. The user is prompted to @@ -95,6 +102,15 @@ accounts: # Usage +## Client Setup + +Before you can install this tool you need to have a working Python installation with pip. +If you're not sure if you have this a good place to start would be the [Python Beginner's Guide](https://wiki.python.org/moin/BeginnersGuide/Download) . + +Once your Python environment is configured simply run `pip install aws-okta-keyman` to install the tool. + +## Running AWS Okta Keyman + For detailed usage instructions, see the `--help` commandline argument. Typical usage: diff --git a/aws_okta_keyman/__init__.py b/aws_okta_keyman/__init__.py index cbe7277..ef1f829 100644 --- a/aws_okta_keyman/__init__.py +++ b/aws_okta_keyman/__init__.py @@ -12,3 +12,4 @@ # # Copyright 2018 Nextdoor.com, Inc # Copyright 2018 Nathan V +"""Empty init.""" diff --git a/aws_okta_keyman/aws.py b/aws_okta_keyman/aws.py index 3c07878..43f5689 100644 --- a/aws_okta_keyman/aws.py +++ b/aws_okta_keyman/aws.py @@ -22,15 +22,17 @@ to AWS to get them. """ from __future__ import unicode_literals -from builtins import str + import configparser import datetime import logging import os -from os.path import expanduser import xml +from builtins import str +from os.path import expanduser import boto3 + from aws_okta_keyman.aws_saml import SamlAssertion LOG = logging.getLogger(__name__) diff --git a/aws_okta_keyman/config.py b/aws_okta_keyman/config.py index 0259485..03b89e9 100644 --- a/aws_okta_keyman/config.py +++ b/aws_okta_keyman/config.py @@ -22,6 +22,7 @@ import os import yaml + from aws_okta_keyman.metadata import __version__ LOG = logging.getLogger(__name__) @@ -196,20 +197,29 @@ def optional_args(optional_args): ), default=False) - def parse_config(self, filename): - """Parse a configuration file and set the variables from it.""" + @staticmethod + def read_yaml(filename, raise_on_error=False): + """Read a YAML file and optionally raise if anything goes wrong.""" + config = {} try: if os.path.isfile(filename): config = yaml.load(open(filename, 'r')) LOG.debug("YAML loaded config: {}".format(config)) else: - raise IOError("File not found: {}".format(filename)) + if raise_on_error: + raise IOError("File not found: {}".format(filename)) except (yaml.parser.ParserError, yaml.scanner.ScannerError): LOG.error('Error parsing config file; invalid YAML.') - raise + if raise_on_error: + raise + return config + + def parse_config(self, filename): + """Parse a configuration file and set the variables from it.""" + config = self.read_yaml(filename, raise_on_error=True) for key, value in config.items(): - if getattr(self, key) is None: # Only overwrite None not args + if not getattr(self, key): # Only overwrite None not args setattr(self, key, value) def write_config(self): @@ -217,15 +227,7 @@ def write_config(self): config file. """ file_path = os.path.expanduser(self.writepath) - try: - if os.path.isfile(file_path): - config = yaml.load(open(file_path, 'r')) - LOG.debug("YAML loaded config: {}".format(config)) - else: - config = {} - except (yaml.parser.ParserError, yaml.scanner.ScannerError): - config = {} - LOG.error('Error parsing config file; invalid YAML.') + config = self.read_yaml(file_path) args_dict = dict(vars(self)) @@ -236,8 +238,16 @@ def write_config(self): if args_dict[key] is not None: setattr(self, key, args_dict[key]) - config = dict(vars(self)) - # Remove args we don't want to save to a config file + config_out = self.clean_config_for_write(dict(vars(self))) + + LOG.debug("YAML being saved: {}".format(config_out)) + + with open(file_path, 'w') as outfile: + yaml.safe_dump(config_out, outfile, default_flow_style=False) + + @staticmethod + def clean_config_for_write(config): + """Remove args we don't want to save to a config file.""" ignore = ['name', 'appid', 'argv', 'writepath', 'config', 'debug', 'oktapreview'] for var in ignore: @@ -246,7 +256,4 @@ def write_config(self): if config['accounts'] is None: del config['accounts'] - LOG.debug("YAML being saved: {}".format(config)) - - with open(file_path, 'w') as outfile: - yaml.safe_dump(config, outfile, default_flow_style=False) + return config diff --git a/aws_okta_keyman/duo.py b/aws_okta_keyman/duo.py index 55eda0d..9741e69 100644 --- a/aws_okta_keyman/duo.py +++ b/aws_okta_keyman/duo.py @@ -13,12 +13,13 @@ # Copyright 2018 Nathan V """All the Duo things.""" -from multiprocessing import Process import sys import time +from multiprocessing import Process + if sys.version_info[0] < 3: # pragma: no cover from BaseHTTPServer import BaseHTTPRequestHandler, HTTPServer -else: +else: # pragma: no cover from http.server import HTTPServer, BaseHTTPRequestHandler diff --git a/aws_okta_keyman/keyman.py b/aws_okta_keyman/keyman.py new file mode 100644 index 0000000..27f787a --- /dev/null +++ b/aws_okta_keyman/keyman.py @@ -0,0 +1,230 @@ +#!/usr/bin/env python + +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +# +# Copyright 2018 Nextdoor.com, Inc +# Copyright 2018 Nathan V +"""This module contains the primary logic of the tool.""" +from __future__ import unicode_literals + +import getpass +import logging +import sys +import time +from builtins import input + +import rainbow_logging_handler +import requests + +from aws_okta_keyman import aws, okta +from aws_okta_keyman.config import Config +from aws_okta_keyman.metadata import __desc__, __version__ + + +class Keyman: + """Main class for the tool.""" + + def __init__(self, argv): + self.okta_client = None + self.log = self.setup_logging() + self.log.info('{} v{}'.format(__desc__, __version__)) + self.config = Config(argv) + try: + self.config.get_config() + except ValueError as err: + self.log.fatal(err) + sys.exit(1) + if self.config.debug: + self.log.setLevel(logging.DEBUG) + + def main(self): + """Execute primary logic path.""" + try: + # If there's no appid try to select from accounts in config file + self.handle_appid_selection() + + # get user password + password = self.user_password() + + # Generate our initial OktaSaml client + self.init_okta(password) + + # Authenticate to Okta + self.auth_okta() + + # Start the AWS session and loop (if using reup) + self.aws_auth_loop() + + except KeyboardInterrupt: + # Allow users to exit cleanly at any time. + print('') + self.log.info('Exiting after keyboard interrupt.') + sys.exit(1) + + @staticmethod + def setup_logging(): + """Return back a pretty color-coded logger.""" + logger = logging.getLogger() + logger.setLevel(logging.INFO) + handler = rainbow_logging_handler.RainbowLoggingHandler(sys.stdout) + fmt = '%(asctime)-10s (%(levelname)s) %(message)s' + formatter = logging.Formatter(fmt) + handler.setFormatter(formatter) + logger.addHandler(handler) + return logger + + @staticmethod + def user_input(text): + """Wrap input() making testing support of py2 and py3 easier.""" + return input(text) + + @staticmethod + def user_password(): + """Wrap getpass to simplify testing.""" + return getpass.getpass() + + def selector_menu(self, options, key, key_name): + """Show a selection menu from a dict so the user can pick something.""" + selection = -1 + while selection < 0 or selection > len(options): + for index, option in enumerate(options): + print("[{}] {}: {}".format(index, key_name, option[key])) + selection = int(self.user_input("{} selection: ".format(key_name))) + return selection + + def handle_appid_selection(self): + """If we have no appid specified and we have accounts from a config + file display the options to the user and select one + """ + if self.config.appid is None and self.config.accounts: + msg = 'No app ID provided; select from available AWS accounts' + self.log.warning(msg) + accts = self.config.accounts + acct_selection = self.selector_menu(accts, 'name', 'Account') + self.config.set_appid_from_account_id(acct_selection) + msg = "Using account: {} / {}".format( + accts[acct_selection]["name"], accts[acct_selection]["appid"] + ) + self.log.info(msg) + + def init_okta(self, password): + """Initialize the Okta client or exit if the client received an empty + input value + """ + try: + if self.config.oktapreview is True: + self.okta_client = okta.OktaSaml(self.config.org, + self.config.username, + password, oktapreview=True) + else: + self.okta_client = okta.OktaSaml(self.config.org, + self.config.username, + password) + + except okta.EmptyInput: + self.log.fatal('Cannot enter a blank string for any input') + sys.exit(1) + + def auth_okta(self): + """Authenticate the Okta client. Prompt for MFA if necessary""" + try: + self.okta_client.auth() + except okta.InvalidPassword: + self.log.fatal('Invalid Username ({user}) or Password'.format( + user=self.config.username + )) + sys.exit(1) + except okta.PasscodeRequired as err: + self.log.warning( + "MFA Requirement Detected - Enter your {} code here".format( + err.provider + ) + ) + verified = False + while not verified: + passcode = self.user_input('MFA Passcode: ') + verified = self.okta_client.validate_mfa(err.fid, + err.state_token, + passcode) + except okta.UnknownError as err: + self.log.fatal("Fatal error: {}".format(err)) + sys.exit(1) + + def handle_multiple_roles(self, session): + """If there's more than one role available from AWS present the user + with a list to pick from + """ + self.log.warning('Multiple AWS roles found; please select one') + roles = session.available_roles() + role_selection = self.selector_menu(roles, 'role', 'Role') + session.set_role(role_selection) + session.assume_role() + return role_selection + + def start_session(self): + """Initialize AWS session object.""" + assertion = self.okta_client.get_assertion(appid=self.config.appid, + apptype='amazon_aws') + return aws.Session(assertion, profile=self.config.name) + + def aws_auth_loop(self): + """Once we're authenticated with an OktaSaml client object we use that + object to get a fresh SAMLResponse repeatedly and refresh our AWS + Credentials. + """ + session = None + role_selection = None + retries = 0 + while True: + # If we have a session and it's valid take a nap + if session and session.is_valid: + self.log.debug('Credentials are still valid, sleeping') + time.sleep(15) + continue + + self.log.info('Getting SAML Assertion from {org}'.format( + org=self.config.org)) + + try: + session = self.start_session() + + # If role_selection is set we're in a reup loop. Re-set the + # role on the session to prevent the user being prompted for + # the role again on each subsequent renewal. + if role_selection is not None: + session.set_role(role_selection) + + session.assume_role() + + except aws.MultipleRoles: + role_selection = self.handle_multiple_roles(session) + except requests.exceptions.ConnectionError: + self.log.warning('Connection error... will retry') + time.sleep(5) + continue + except aws.InvalidSaml: + self.log.error('AWS SAML response invalid. Retrying...') + time.sleep(1) + retries += 1 + if retries > 2: + self.log.fatal('SAML failure. Please reauthenticate.') + sys.exit(1) + continue + + # If we're not running in re-up mode, once we have the assertion + # and creds, go ahead and quit. + if not self.config.reup: + return + + self.log.info('Reup enabled, sleeping...') + time.sleep(5) diff --git a/aws_okta_keyman/main.py b/aws_okta_keyman/main.py index 6a735d6..dac5e57 100644 --- a/aws_okta_keyman/main.py +++ b/aws_okta_keyman/main.py @@ -14,171 +14,19 @@ # # Copyright 2018 Nextdoor.com, Inc # Copyright 2018 Nathan V +"""Main function that passes off to the Keyman module.""" from __future__ import unicode_literals -import getpass -import logging -import sys -import time -import requests -from builtins import input - -import rainbow_logging_handler - -from aws_okta_keyman import okta -from aws_okta_keyman import aws -from aws_okta_keyman.config import Config -from aws_okta_keyman.metadata import __desc__, __version__ - - -def user_input(text): - '''Wraps input() making testing support of py2 and py3 easier''' - return input(text) - - -def setup_logging(): - '''Returns back a pretty color-coded logger''' - logger = logging.getLogger() - logger.setLevel(logging.INFO) - handler = rainbow_logging_handler.RainbowLoggingHandler(sys.stdout) - fmt = '%(asctime)-10s (%(levelname)s) %(message)s' - formatter = logging.Formatter(fmt) - handler.setFormatter(formatter) - logger.addHandler(handler) - return logger - - -def main(argv): - # Generate our logger first, and write out our app name and version - log = setup_logging() - log.info('%s v%s' % (__desc__, __version__)) - - # Get our configuration object based on the CLI options. This handles - # parsing arguments and ensuring the user supplied the required params. - config = Config(argv) - try: - config.get_config() - except ValueError as err: - log.fatal(err) - sys.exit(1) - - if config.appid is None and config.accounts: - msg = 'No app ID provided; please select from available AWS accounts' - log.warning(msg) - accts = config.accounts - for acct_index, role in enumerate(accts): - print("[{}] Account: {}".format(acct_index, role["name"])) - acct_selection = int(user_input('Select an account from above: ')) - config.set_appid_from_account_id(acct_selection) - msg = "Using account: {} / {}".format(accts[acct_selection]["name"], - accts[acct_selection]["appid"]) - log.info(msg) - - if config.debug: - log.setLevel(logging.DEBUG) - # Ask the user for their password.. we do this once at the beginning, and - # we keep it in memory for as long as this tool is running. Its never ever - # written out or cached to disk anywhere. - try: - password = getpass.getpass() - except KeyboardInterrupt: - print('') - sys.exit(1) - - # Generate our initial OktaSaml client and handle any exceptions thrown. - # Generally these are input validation issues. - try: - if config.oktapreview is True: - okta_client = okta.OktaSaml(config.org, config.username, password, - oktapreview=True) - else: - okta_client = okta.OktaSaml(config.org, config.username, password) - except okta.EmptyInput: - log.fatal('Cannot enter a blank string for any input') - sys.exit(1) - - # Authenticate the Okta client. If necessary, we will ask for MFA input. - try: - okta_client.auth() - except okta.InvalidPassword: - log.fatal('Invalid Username ({user}) or Password'.format( - user=config.username)) - sys.exit(1) - except okta.PasscodeRequired as e: - log.warning('MFA Requirement Detected - Enter your passcode here') - verified = False - while not verified: - passcode = user_input('MFA Passcode: ') - verified = okta_client.validate_mfa(e.fid, e.state_token, passcode) - except okta.UnknownError as err: - log.fatal("Fatal error: {}".format(err)) - sys.exit(1) - - # Once we're authenticated with an OktaSaml client object, we can use that - # object to get a fresh SAMLResponse repeatedly and refresh our AWS - # Credentials. - session = None - role_selection = None - retries = 0 - while True: - # If an AWS Session object has been created already, lets check if its - # still valid. If it is, sleep a bit and skip to the next execution of - # the loop. - if session and session.is_valid: - log.debug('Credentials are still valid, sleeping') - time.sleep(15) - continue - - log.info('Getting SAML Assertion from {org}'.format( - org=config.org)) - - try: - assertion = okta_client.get_assertion(appid=config.appid, - apptype='amazon_aws') - session = aws.Session(assertion, profile=config.name) - - # If role_selection is set we're in a reup loop. Re-set the role on - # the session to prevent the user being prompted for the role again - # on each subsequent renewal. - if role_selection is not None: - session.set_role(role_selection) - - session.assume_role() - - except aws.MultipleRoles: - log.warning('Multiple AWS roles found; please select one') - roles = session.available_roles() - for role_index, role in enumerate(roles): - print("[{}] Role: {}".format(role_index, role["role"])) - role_selection = user_input('Select a role from above: ') - session.set_role(role_selection) - session.assume_role() - except requests.exceptions.ConnectionError as e: - log.warning('Connection error... will retry') - time.sleep(5) - continue - - except aws.InvalidSaml: - log.error('SAML response from AWS is invalid. Retrying...') - time.sleep(1) - retries += 1 - if retries > 2: - log.fatal('SAML failure. Please reauthenticate.') - sys.exit(1) - - # If we're not running in re-up mode, once we have the assertion - # and creds, go ahead and quit. - if not config.reup: - break +import sys - log.info('Reup enabled, sleeping...') - time.sleep(5) +from aws_okta_keyman.keyman import Keyman def entry_point(): """Zero-argument entry point for use with setuptools/distribute.""" - raise SystemExit(main(sys.argv)) + keyman = Keyman(sys.argv) + raise SystemExit(keyman.main()) if __name__ == '__main__': diff --git a/aws_okta_keyman/metadata.py b/aws_okta_keyman/metadata.py index 0f2674d..ae3d977 100644 --- a/aws_okta_keyman/metadata.py +++ b/aws_okta_keyman/metadata.py @@ -14,7 +14,7 @@ # Copyright 2018 Nathan V """Package metadata.""" -__version__ = '0.3.2' +__version__ = '0.3.3' __desc__ = 'AWS Okta Keyman' __desc_long__ = (''' =============== diff --git a/aws_okta_keyman/okta.py b/aws_okta_keyman/okta.py index 243819f..222c428 100644 --- a/aws_okta_keyman/okta.py +++ b/aws_okta_keyman/okta.py @@ -17,57 +17,60 @@ # WHATSOEVER RESULTING FROM LOSS OF USE, DATA OR PROFITS, WHETHER IN AN # ACTION OF CONTRACT, NEGLIGENCE OR OTHER TORTIOUS ACTION, ARISING OUT OF # OR IN CONNECTION WITH THE USE OR PERFORMANCE OF THIS SOFTWARE. - +"""This contains all of the Okta-specific code we need.""" from __future__ import unicode_literals import base64 import logging import time +from multiprocessing import Process import sys +import webbrowser + import bs4 import requests -import webbrowser -from multiprocessing import Process + from aws_okta_keyman.duo import Duo if sys.version_info[0] < 3: # pragma: no cover from exceptions import Exception # Python 2 -log = logging.getLogger(__name__) +LOG = logging.getLogger(__name__) BASE_URL = 'https://{organization}.okta.com' PREVIEW_BASE_URL = 'https://{organization}.oktapreview.com' class BaseException(Exception): - '''Base Exception for Okta Auth''' + """Base Exception for Okta Auth.""" class UnknownError(Exception): - '''Some Expected Return Was Received''' + """Some Expected Return Was Received.""" class EmptyInput(BaseException): - '''Invalid Input - Empty String Detected''' + """Invalid Input - Empty String Detected.""" class InvalidPassword(BaseException): - '''Invalid Password''' + """Invalid Password.""" class PasscodeRequired(BaseException): - '''A 2FA Passcode Must Be Entered''' + """A 2FA Passcode Must Be Entered.""" - def __init__(self, fid, state_token): + def __init__(self, fid, state_token, provider): self.fid = fid self.state_token = state_token + self.provider = provider + super(PasscodeRequired, self).__init__() class OktaVerifyRequired(BaseException): - '''OktaVerify Authentication Is Required''' + """OktaVerify Authentication Is Required.""" class Okta(object): - - '''Base Okta Login Object with MFA handling. + """Base Okta Login Object with MFA handling. This base login object handles connecting to Okta, authenticating a user, and optionally triggering MFA Authentication. No application specific logic @@ -75,7 +78,7 @@ class Okta(object): cookie-authenticated requests.Session() object. See OktaSaml for a more useful object. - ''' + """ def __init__(self, organization, username, password, oktapreview=False): if oktapreview: @@ -83,7 +86,7 @@ def __init__(self, organization, username, password, oktapreview=False): else: self.base_url = BASE_URL.format(organization=organization) - log.debug('Base URL Set to: {url}'.format(url=self.base_url)) + LOG.debug('Base URL Set to: {url}'.format(url=self.base_url)) # Validate the inputs are reasonably sane for input in (organization, username, password): @@ -93,9 +96,10 @@ def __init__(self, organization, username, password, oktapreview=False): self.username = username self.password = password self.session = requests.Session() + self.session_token = None def _request(self, path, data=None): - '''Basic URL Fetcher for Okta + """Make Okta API calls. Any HTTPError is raised immediately, otherwise the response is parsed as JSON and passed back as a dictionary. @@ -106,7 +110,7 @@ def _request(self, path, data=None): Returns: The response in dict form. - ''' + """ headers = {'Accept': 'application/json', 'Content-Type': 'application/json'} @@ -119,13 +123,13 @@ def _request(self, path, data=None): allow_redirects=False) resp_obj = resp.json() - log.debug(resp_obj) + LOG.debug(resp_obj) resp.raise_for_status() return resp_obj def set_token(self, ret): - '''Parses an authentication response and stores the token. + """Parse an authentication response and stores the token. Parses a SUCCESSFUL authentication response from Okta and stores the token. @@ -133,15 +137,15 @@ def set_token(self, ret): args: ret: The response from Okta that we know is successful and contains a sessionToken - ''' - firstName = ret['_embedded']['user']['profile']['firstName'] - lastName = ret['_embedded']['user']['profile']['lastName'] - log.info('Successfully authed {firstName} {lastName}'.format( - firstName=firstName, lastName=lastName)) + """ + first_name = ret['_embedded']['user']['profile']['firstName'] + last_name = ret['_embedded']['user']['profile']['lastName'] + LOG.info('Successfully authed {first_name} {last_name}'.format( + first_name=first_name, last_name=last_name)) self.session_token = ret['sessionToken'] def validate_mfa(self, fid, state_token, passcode): - '''Validates an Okta user with Passcode-based MFA. + """Validate an Okta user with Passcode-based MFA. Takes in the supplied Factor ID (fid), State Token and user supplied Passcode, and validates the auth. If successful, sets the session @@ -155,9 +159,9 @@ def validate_mfa(self, fid, state_token, passcode): Returns: True/False whether or not authentication was successful - ''' + """ if len(passcode) != 6: - log.error('Passcodes must be 6 digits') + LOG.error('Passcodes must be 6 digits') return False path = '/authn/factors/{fid}/verify'.format(fid=fid) @@ -166,17 +170,17 @@ def validate_mfa(self, fid, state_token, passcode): 'passCode': passcode} try: ret = self._request(path, data) - except requests.exceptions.HTTPError as e: - if e.response.status_code == 403: - log.error('Invalid Passcode Detected') + except requests.exceptions.HTTPError as err: + if err.response.status_code == 403: + LOG.error('Invalid Passcode Detected') return False - raise UnknownError(e.response.body) + raise UnknownError(err.response.body) self.set_token(ret) return True - def okta_verify_with_push(self, fid, state_token, sleep=1): - '''Triggers an Okta Push Verification and waits. + def okta_verify(self, fid, state_token): + """Trigger an Okta Push Verification and waits. This method is meant to be called by self.auth() if a Login session requires MFA, and the users profile supports Okta Push with Verify. @@ -189,33 +193,21 @@ def okta_verify_with_push(self, fid, state_token, sleep=1): Args: fid: Okta Factor ID used to trigger the push state_token: State Token allowing us to trigger the push - ''' - log.warning('Okta Verify Push being sent...') + """ + LOG.warning('Okta Verify Push being sent...') path = '/authn/factors/{fid}/verify'.format(fid=fid) data = {'fid': fid, 'stateToken': state_token} ret = self._request(path, data) - while ret['status'] != 'SUCCESS': - log.info('Waiting for Okta Verification...') - time.sleep(sleep) - - if ret.get('factorResult', 'REJECTED') == 'REJECTED': - log.error('Okta Verify Push REJECTED') - return False - - if ret.get('factorResult', 'TIMEOUT') == 'TIMEOUT': - log.error('Okta Verify Push TIMEOUT') - return False - - links = ret.get('_links') - ret = self._request(links['next']['href'], data) - - self.set_token(ret) - return True + ret = self.mfa_wait_loop(ret, data) + if ret: + self.set_token(ret) + return True + return None - def duo_auth(self, uid, fid, state_token, sleep=1): - '''Triggers a Duo Auth request. + def duo_auth(self, fid, state_token): + """Trigger a Duo Auth request. This method is meant to be called by self.auth() if a Login session requires MFA, and the users profile supports Duo Web. @@ -226,11 +218,10 @@ def duo_auth(self, uid, fid, state_token, sleep=1): we will throw an error. If its accepted, we write out our SessionToken. Args: - uid: Okta user ID required by Duo fid: Okta Factor ID used to trigger the push state_token: State Token allowing us to trigger the push - ''' - log.warning('Duo requied; opening browser...') + """ + LOG.warning('Duo required; opening browser...') path = '/authn/factors/{fid}/verify'.format(fid=fid) data = {'fid': fid, 'stateToken': state_token} @@ -238,31 +229,41 @@ def duo_auth(self, uid, fid, state_token, sleep=1): verification = ret['_embedded']['factor']['_embedded']['verification'] duo = Duo(verification, state_token) - p = Process(target=duo.trigger_duo) - p.start() + proc = Process(target=duo.trigger_duo) + proc.start() time.sleep(2) webbrowser.open_new('http://127.0.0.1:65432/duo.html') - while ret['status'] != 'SUCCESS': - log.info('Waiting for Duo Auth success...') - time.sleep(sleep) + ret = self.mfa_wait_loop(ret, data) + if ret: + self.set_token(ret) + return True + return None - if ret.get('factorResult', 'REJECTED') == 'REJECTED': - log.error('Duo Push REJECTED') - return False + def mfa_wait_loop(self, ret, data, sleep=1): + """Wait loop that keeps checking Okta for MFA status.""" + try: + while ret['status'] != 'SUCCESS': + LOG.info('Waiting for MFA success...') + time.sleep(sleep) - if ret.get('factorResult', 'TIMEOUT') == 'TIMEOUT': - log.error('Duo Push TIMEOUT') - return False + if ret.get('factorResult', 'REJECTED') == 'REJECTED': + LOG.error('Duo Push REJECTED') + return None - links = ret.get('_links') - ret = self._request(links['next']['href'], data) + if ret.get('factorResult', 'TIMEOUT') == 'TIMEOUT': + LOG.error('Duo Push TIMEOUT') + return None - self.set_token(ret) - return True + links = ret.get('_links') + ret = self._request(links['next']['href'], data) + return ret + except KeyboardInterrupt: + LOG.info('User canceled waiting for MFA success.') + return None def auth(self): - '''Performs an initial authentication against Okta. + """Perform an initial authentication against Okta. The initial Okta Login authentication is handled here - and optionally MFA authentication is triggered. If successful, this method stores a @@ -270,7 +271,7 @@ def auth(self): "Embed Link" of an Okta Application. **Note ... Undocumented/Unclear Okta Behavior** - If you use the SessionToken only to make your subsequent requests, its + If you use the SessionToken only to make your subsequent requests, it's usable only once and then it expires. However, if you combine it with a long-lived SID cookie (which we do, by using reqests.Session() to make all of our web requests), then that SessionToken can be redeemd many @@ -279,57 +280,69 @@ def auth(self): https://developer.okta.com/use_cases/authentication/ session_cookie#visit-an-embed-link-with-the-session-token - ''' + """ path = '/authn' data = {'username': self.username, 'password': self.password} try: ret = self._request(path, data) - except requests.exceptions.HTTPError as e: - if e.response.status_code == 401: + except requests.exceptions.HTTPError as err: + if err.response.status_code == 401: raise InvalidPassword() status = ret.get('status', None) if status == 'SUCCESS': self.set_token(ret) - return + return None if status == 'MFA_ENROLL' or status == 'MFA_ENROLL_ACTIVATE': - log.warning('User {u} needs to enroll in 2FA first'.format( + LOG.warning('User {u} needs to enroll in 2FA first'.format( u=self.username)) - raise UnknownError() if status == 'MFA_REQUIRED' or status == 'MFA_CHALLENGE': - for factor in ret['_embedded']['factors']: - try: - if factor['factorType'] == 'push': - if self.okta_verify_with_push(factor['id'], - ret['stateToken']): - return - if factor['provider'] == 'DUO': - if self.duo_auth(ret['_embedded']['user']['id'], - factor['id'], - ret['stateToken']): - return - - except KeyboardInterrupt: - # Allow users to use MFA Passcode by - # breaking out of waiting for the push. - break - - for factor in ret['_embedded']['factors']: - if factor['factorType'] == 'token:software:totp': - raise PasscodeRequired( - fid=factor['id'], - state_token=ret['stateToken']) + return self.handle_mfa_response(ret) raise UnknownError(status) + def handle_mfa_response(self, ret): + """In the case of an MFA response evaluate the response and handle + accordingly based on available MFA factors. + """ + otp_possible = False + otp_provider = None + for factor in ret['_embedded']['factors']: + if factor['factorType'] == 'push': + if self.okta_verify(factor['id'], ret['stateToken']): + return True + if factor['provider'] == 'DUO': + if self.duo_auth(factor['id'], ret['stateToken']): + return True + if factor['factorType'] == 'token:software:totp': + # Handle OTP separately in case we can do Okta or Duo but fail + # then we can fall back to OTP + LOG.debug('Software OTP option found') + otp_provider = factor['provider'] + otp_possible = True + + if otp_possible: + raise PasscodeRequired( + fid=factor['id'], + state_token=ret['stateToken'], + provider=otp_provider) + else: + # Log out the factors to make debugging MFA issues easier + LOG.debug("Factors from Okta: {}".format( + ret['_embedded']['factors'])) + LOG.fatal('MFA type in use is unsupported') + raise UnknownError('MFA type in use is unsupported') + class OktaSaml(Okta): + """Handle the SAML part of talking to Okta.""" def assertion(self, saml): + """Parse the assertion from the SAML response.""" assertion = '' soup = bs4.BeautifulSoup(saml, 'html.parser') for inputtag in soup.find_all('input'): @@ -338,18 +351,19 @@ def assertion(self, saml): return base64.b64decode(assertion) def get_assertion(self, appid, apptype): + """Call Okta and get the assertion.""" path = '{url}/home/{apptype}/{appid}'.format( url=self.base_url, apptype=apptype, appid=appid) resp = self.session.get(path, params={'onetimetoken': self.session_token}) - log.debug(resp.__dict__) + LOG.debug(resp.__dict__) try: resp.raise_for_status() except (requests.exceptions.HTTPError, - requests.exceptions.ConnectionError) as e: - log.error('Unknown error: {msg}'.format( - msg=str(e.response.__dict__))) + requests.exceptions.ConnectionError) as err: + LOG.error('Unknown error: {msg}'.format( + msg=str(err.response.__dict__))) raise UnknownError() return self.assertion(resp.text) diff --git a/aws_okta_keyman/test/aws_saml_test.py b/aws_okta_keyman/test/aws_saml_test.py index 167d202..0e8b940 100644 --- a/aws_okta_keyman/test/aws_saml_test.py +++ b/aws_okta_keyman/test/aws_saml_test.py @@ -20,7 +20,9 @@ # OR IN CONNECTION WITH THE USE OR PERFORMANCE OF THIS SOFTWARE. import sys + from aws_okta_keyman.aws_saml import SamlAssertion + if sys.version_info < (2, 7): import unittest2 as unittest else: diff --git a/aws_okta_keyman/test/aws_test.py b/aws_okta_keyman/test/aws_test.py index 240e77b..adde488 100644 --- a/aws_okta_keyman/test/aws_test.py +++ b/aws_okta_keyman/test/aws_test.py @@ -1,8 +1,11 @@ from __future__ import unicode_literals + import datetime -import unittest import sys +import unittest + from aws_okta_keyman import aws + if sys.version_info[0] < 3: # Python 2 import mock else: diff --git a/aws_okta_keyman/test/config_test.py b/aws_okta_keyman/test/config_test.py index fd5918f..201d82f 100644 --- a/aws_okta_keyman/test/config_test.py +++ b/aws_okta_keyman/test/config_test.py @@ -1,9 +1,13 @@ from __future__ import unicode_literals -import unittest + import os import sys +import unittest + import yaml + from aws_okta_keyman.config import Config + if sys.version_info[0] < 3: # Python 2 import mock else: @@ -228,97 +232,143 @@ def test_parse_args_verify_all_parsed_full(self): self.assertEquals(config.reup, True) @mock.patch('aws_okta_keyman.config.os.path.isfile') - def test_parse_config(self, isfile_mock): + def test_read_yaml(self, isfile_mock): isfile_mock.return_value = True - - config = Config(['aws_okta_keyman.py']) yml = ("username: user@example.com\n" "org: example\n" "appid: app/id\n") m = mock.mock_open(read_data=yml) with mock.patch('aws_okta_keyman.config.open', m): - config.parse_config('./.config/aws_okta_keyman.yml') + ret = Config.read_yaml('./.config/aws_okta_keyman.yml') - self.assertEquals(config.appid, 'app/id') - self.assertEquals(config.org, 'example') - self.assertEquals(config.username, 'user@example.com') + expected = { + 'username': 'user@example.com', 'org': 'example', 'appid': 'app/id' + } + self.assertEqual(ret, expected) @mock.patch('aws_okta_keyman.config.os.path.isfile') - def test_parse_config_args_preferred(self, isfile_mock): - isfile_mock.return_value = True + def test_read_yaml_file_missing_no_raise(self, isfile_mock): + isfile_mock.return_value = False + ret = Config.read_yaml('./.config/aws_okta_keyman.yml') + self.assertEqual(ret, {}) - config = Config(['aws_okta_keyman.py']) - config.appid = 'mysupercoolapp/id' - config.org = 'foobar' - config.username = 'test' + @mock.patch('aws_okta_keyman.config.os.path.isfile') + def test_read_yaml_file_missing_with_raise(self, isfile_mock): + isfile_mock.return_value = False + with self.assertRaises(IOError): + Config.read_yaml('./.config/aws_okta_keyman.yml', + raise_on_error=True) + + @mock.patch('aws_okta_keyman.config.os.path.isfile') + def test_read_yaml_parse_error_no_raise(self, isfile_mock): + isfile_mock.return_value = True yml = ("username: user@example.com\n" "org: example\n" - "appid: app/id\n") + "- appid: foo\n") m = mock.mock_open(read_data=yml) with mock.patch('aws_okta_keyman.config.open', m): - config.parse_config('./.config/aws_okta_keyman.yml') + ret = Config.read_yaml('./.config/aws_okta_keyman.yml') - # Make sure we're getting the args not the config values - self.assertEquals(config.appid, 'mysupercoolapp/id') - self.assertEquals(config.org, 'foobar') - self.assertEquals(config.username, 'test') - - def test_parse_config_file_missing(self): - config = Config(['aws_okta_keyman.py']) - with self.assertRaises(IOError): - config.parse_config('./.config/aws_okta_keyman.yml') + self.assertEqual(ret, {}) @mock.patch('aws_okta_keyman.config.os.path.isfile') - def test_parse_config_yaml_scan_error(self, isfile_mock): + def test_read_yaml_parse_error_with_raise(self, isfile_mock): isfile_mock.return_value = True - - config = Config(['aws_okta_keyman.py']) yml = ("username: user@example.com\n" "org: example\n" - "appid app/id\n") + "- appid: foo\n") m = mock.mock_open(read_data=yml) with mock.patch('aws_okta_keyman.config.open', m): - with self.assertRaises(yaml.scanner.ScannerError): - config.parse_config('./.config/aws_okta_keyman.yml') + with self.assertRaises(yaml.parser.ParserError): + Config.read_yaml('./.config/aws_okta_keyman.yml', + raise_on_error=True) @mock.patch('aws_okta_keyman.config.os.path.isfile') - def test_parse_config_yaml_parse_error(self, isfile_mock): + def test_read_yaml_scan_error_no_raise(self, isfile_mock): isfile_mock.return_value = True - - config = Config(['aws_okta_keyman.py']) yml = ("username: user@example.com\n" "org: example\n" - "- appid: foo\n") + "appid app/id\n") m = mock.mock_open(read_data=yml) with mock.patch('aws_okta_keyman.config.open', m): - with self.assertRaises(yaml.parser.ParserError): - config.parse_config('./.config/aws_okta_keyman.yml') + ret = Config.read_yaml('./.config/aws_okta_keyman.yml') + + self.assertEqual(ret, {}) @mock.patch('aws_okta_keyman.config.os.path.isfile') - def test_write_config(self, isfile_mock): + def test_read_yaml_scan_error_with_raise(self, isfile_mock): isfile_mock.return_value = True + yml = ("username: user@example.com\n" + "org: example\n" + "appid app/id\n") + + m = mock.mock_open(read_data=yml) + with mock.patch('aws_okta_keyman.config.open', m): + with self.assertRaises(yaml.scanner.ScannerError): + Config.read_yaml('./.config/aws_okta_keyman.yml', + raise_on_error=True) + def test_parse_config(self): config = Config(['aws_okta_keyman.py']) + config.read_yaml = mock.MagicMock() + config.read_yaml.return_value = { + 'username': 'user@example.com', + 'org': 'example', + 'appid': 'app/id', + } + + config.parse_config('./.config/aws_okta_keyman.yml') + + self.assertEquals(config.appid, 'app/id') + self.assertEquals(config.org, 'example') + self.assertEquals(config.username, 'user@example.com') + + def test_parse_config_args_preferred(self): + config = Config(['aws_okta_keyman.py']) + config.appid = 'mysupercoolapp/id' + config.org = 'foobar' + config.username = 'test' + config.read_yaml = mock.MagicMock() + config.read_yaml.return_value = { + 'username': 'user@example.com', + 'org': 'example', + 'appid': 'app/id', + } + + config.parse_config('./.config/aws_okta_keyman.yml') + + # Make sure we're getting the args not the config values + self.assertEquals(config.appid, 'mysupercoolapp/id') + self.assertEquals(config.org, 'foobar') + self.assertEquals(config.username, 'test') + + def test_write_config(self): + config = Config(['aws_okta_keyman.py']) + config.clean_config_for_write = mock.MagicMock() + config_clean = { + 'accounts': [{'name': 'Dev', 'appid': 'A123/123'}], + 'org': 'example', + 'reup': None, + 'username': 'example@example.com', + } + config.clean_config_for_write.return_value = config_clean config.writepath = './.config/aws_okta_keyman.yml' config.username = 'example@example.com' - yml = ("username: user@example.com\n" - "org: example\n" - "appid: app/id\n" - "accounts:\n" - " - name: Dev\n" - " appid: A123/123\n") + config.read_yaml = mock.MagicMock() + config.read_yaml.return_value = { + 'username': 'user@example.com', + 'org': 'example', + 'appid': 'app/id', + } - m = mock.mock_open(read_data=yml) + m = mock.mock_open() with mock.patch('aws_okta_keyman.config.open', m): config.write_config() - m.assert_has_calls([ - mock.call('./.config/aws_okta_keyman.yml', 'r'), - ]) m.assert_has_calls([ mock.call(u'./.config/aws_okta_keyman.yml', 'w'), ]) @@ -359,15 +409,21 @@ def test_write_config(self, isfile_mock): mock.call().__exit__(None, None, None) ]) - @mock.patch('aws_okta_keyman.config.os.path.isfile') - def test_write_config_new_file(self, isfile_mock): - isfile_mock.return_value = False - + def test_write_config_new_file(self): config = Config(['aws_okta_keyman.py']) + config.clean_config_for_write = mock.MagicMock() + config_clean = { + 'org': 'example', + 'reup': None, + 'username': 'example@example.com', + } + config.clean_config_for_write.return_value = config_clean config.writepath = './.config/aws_okta_keyman.yml' config.username = 'example@example.com' config.appid = 'app/id' config.org = 'example' + config.read_yaml = mock.MagicMock() + config.read_yaml.return_value = {} m = mock.mock_open() with mock.patch('aws_okta_keyman.config.open', m): @@ -394,15 +450,16 @@ def test_write_config_new_file(self, isfile_mock): mock.call().__exit__(None, None, None) ]) - @mock.patch('aws_okta_keyman.config.os.path.isfile') - def test_write_config_path_expansion(self, isfile_mock): - isfile_mock.return_value = False - + def test_write_config_path_expansion(self): config = Config(['aws_okta_keyman.py']) + config.clean_config_for_write = mock.MagicMock() + config.clean_config_for_write.return_value = {} config.writepath = '~/.config/aws_okta_keyman.yml' config.username = 'example@example.com' config.appid = 'app/id' config.org = 'example' + config.read_yaml = mock.MagicMock() + config.read_yaml.return_value = {} expected_path = os.path.expanduser(config.writepath) @@ -412,38 +469,43 @@ def test_write_config_path_expansion(self, isfile_mock): m.assert_has_calls([mock.call(expected_path, 'w')]) - @mock.patch('aws_okta_keyman.config.os.path.isfile') - def test_write_config_scan_error(self, isfile_mock): - isfile_mock.return_value = True - - config = Config(['aws_okta_keyman.py']) - config.writepath = '~/.config/aws_okta_keyman.yml' - config.username = 'example@example.com' - config.appid = 'app/id' - config.org = 'example' - - yml = ("username: user@example.com\n" - "org: example\n" - "appid app/id\n") - - m = mock.mock_open(read_data=yml) - with mock.patch('aws_okta_keyman.config.open', m): - config.write_config() - - @mock.patch('aws_okta_keyman.config.os.path.isfile') - def test_write_config_parse_error(self, isfile_mock): - isfile_mock.return_value = True - - config = Config(['aws_okta_keyman.py']) - config.writepath = '~/.config/aws_okta_keyman.yml' - config.username = 'example@example.com' - config.appid = 'app/id' - config.org = 'example' - - yml = ("username: user@example.com\n" - "org: example\n" - "- appid: foo\n") - - m = mock.mock_open(read_data=yml) - with mock.patch('aws_okta_keyman.config.open', m): - config.write_config() + def test_clean_config_for_write(self): + config_in = { + 'name': 'foo', + 'appid': 'foo', + 'argv': 'foo', + 'writepath': 'foo', + 'config': 'foo', + 'debug': 'foo', + 'oktapreview': 'foo', + 'accounts': None, + 'shouldstillbehere': 'woohoo' + } + config_out = { + 'shouldstillbehere': 'woohoo' + } + ret = Config.clean_config_for_write(config_in) + self.assertEqual(ret, config_out) + + def test_clean_config_for_write_with_accounts(self): + accounts = [ + {'name': 'Account 1', 'appid': 'ABC123'}, + {'name': 'Account 2', 'appid': 'XYZ890'} + ] + config_in = { + 'name': 'foo', + 'appid': 'foo', + 'argv': 'foo', + 'writepath': 'foo', + 'config': 'foo', + 'debug': 'foo', + 'oktapreview': 'foo', + 'accounts': accounts, + 'shouldstillbehere': 'woohoo' + } + config_out = { + 'accounts': accounts, + 'shouldstillbehere': 'woohoo' + } + ret = Config.clean_config_for_write(config_in) + self.assertEqual(ret, config_out) diff --git a/aws_okta_keyman/test/duo_test.py b/aws_okta_keyman/test/duo_test.py index e2f1264..94956ef 100644 --- a/aws_okta_keyman/test/duo_test.py +++ b/aws_okta_keyman/test/duo_test.py @@ -1,7 +1,10 @@ from __future__ import unicode_literals + import sys import unittest + from aws_okta_keyman import duo + if sys.version_info[0] < 3: import mock # Python 2 from StringIO import StringIO as IO diff --git a/aws_okta_keyman/test/keyman_test.py b/aws_okta_keyman/test/keyman_test.py new file mode 100644 index 0000000..5254145 --- /dev/null +++ b/aws_okta_keyman/test/keyman_test.py @@ -0,0 +1,282 @@ +from __future__ import unicode_literals + +import logging +import sys +import unittest + +from aws_okta_keyman import aws, okta +from aws_okta_keyman.keyman import Keyman + +if sys.version_info[0] < 3: # Python 2 + import mock +else: + from unittest import mock + + +class KeymanTest(unittest.TestCase): + + def test_setup_logging(self): + # Simple execution test - make sure that the logger code executes and + # returns a root logger. No mocks used here, want to ensure that the + # options passed to the logger are valid. + ret = Keyman.setup_logging() + self.assertEquals(type(ret), type(logging.getLogger())) + + @mock.patch('aws_okta_keyman.keyman.Config') + def test_init_blank_args(self, _config_mock): + keyman = Keyman(['']) + assert isinstance(keyman, Keyman) + + @mock.patch('aws_okta_keyman.keyman.Config') + def test_init_use_debug(self, config_mock): + config_mock().debug = True + keyman = Keyman(['foo', '-o', 'foo', '-u', 'bar', '-a', 'baz', '-D']) + log_level = logging.getLevelName(keyman.log.getEffectiveLevel()) + self.assertEqual('DEBUG', log_level) + + @mock.patch('aws_okta_keyman.keyman.Config') + def test_init_bad_config(self, config_mock): + config_mock().get_config.side_effect = ValueError + with self.assertRaises(SystemExit): + Keyman([]) + + @mock.patch('aws_okta_keyman.keyman.Config') + def test_main(self, _config_mock): + keyman = Keyman(['foo', '-o', 'foo', '-u', 'bar', '-a', 'baz']) + keyman.handle_appid_selection = mock.MagicMock() + keyman.user_password = mock.MagicMock() + keyman.user_password.return_value = 'foo' + keyman.init_okta = mock.MagicMock() + keyman.auth_okta = mock.MagicMock() + keyman.aws_auth_loop = mock.MagicMock() + + keyman.main() + + assert keyman.handle_appid_selection.called + assert keyman.user_password.called + keyman.init_okta.assert_called_with('foo') + assert keyman.auth_okta.called + assert keyman.aws_auth_loop.called + + @mock.patch('aws_okta_keyman.keyman.Config') + def test_main_keyboard_interrupt(self, _config_mock): + keyman = Keyman(['foo', '-o', 'foo', '-u', 'bar', '-a', 'baz']) + keyman.handle_appid_selection = mock.MagicMock() + keyman.user_password = mock.MagicMock() + keyman.user_password.side_effect = KeyboardInterrupt + + with self.assertRaises(SystemExit): + keyman.main() + + @mock.patch('aws_okta_keyman.keyman.input') + def test_user_input(self, input_mock): + input_mock.return_value = 'test' + self.assertEqual('test', Keyman.user_input('input test')) + + @mock.patch('aws_okta_keyman.keyman.getpass') + def test_user_password(self, pass_mock): + pass_mock.getpass.return_value = 'test' + self.assertEqual('test', Keyman.user_password()) + + @mock.patch('aws_okta_keyman.keyman.Config') + def test_selector_menu(self, _config_mock): + keyman = Keyman(['foo', '-o', 'foo', '-u', 'bar', '-a', 'baz']) + stdout_mock = mock.Mock() + sys.stdout = stdout_mock + keyman.user_input = mock.MagicMock() + keyman.user_input.return_value = 0 + stuff = [{'artist': 'Metallica'}, + {'artist': 'Soundgarden'}] + ret = keyman.selector_menu(stuff, 'artist', 'Artist') + self.assertEqual(ret, 0) + stdout_mock.assert_has_calls([ + mock.call.write('[0] Artist: Metallica'), + mock.call.write('\n'), + mock.call.write('[1] Artist: Soundgarden'), + mock.call.write('\n') + ]) + + @mock.patch('aws_okta_keyman.keyman.Config') + def test_selector_menu_keep_asking_if_out_of_range(self, _config_mock): + keyman = Keyman(['foo', '-o', 'foo', '-u', 'bar', '-a', 'baz']) + stdout_mock = mock.Mock() + sys.stdout = stdout_mock + keyman.user_input = mock.MagicMock() + keyman.user_input.side_effect = [99, 98, 0] + stuff = [{'artist': 'Metallica'}, + {'artist': 'Soundgarden'}] + ret = keyman.selector_menu(stuff, 'artist', 'Artist') + self.assertEqual(ret, 0) + keyman.user_input.assert_has_calls([ + mock.call('Artist selection: '), + mock.call('Artist selection: '), + mock.call('Artist selection: ') + ]) + + @mock.patch('aws_okta_keyman.keyman.Config') + def test_handle_appid_selection(self, _config_mock): + keyman = Keyman(['foo', '-o', 'foo', '-u', 'bar']) + keyman.config.accounts = [{'name': 'myAccount', 'appid': 'myID'}] + keyman.config.appid = None + keyman.selector_menu = mock.MagicMock(name='selector_menu') + keyman.selector_menu.return_value = 0 + keyman.config.set_appid_from_account_id = mock.MagicMock() + + keyman.handle_appid_selection() + + keyman.selector_menu.assert_has_calls([ + mock.call( + [{'name': 'myAccount', 'appid': 'myID'}], + 'name', 'Account') + ]) + keyman.config.set_appid_from_account_id.assert_has_calls([ + mock.call(0) + ]) + + @mock.patch('aws_okta_keyman.keyman.Config') + def test_handle_appid_selection_when_appid_provided(self, config_mock): + keyman = Keyman(['foo', '-o', 'foo', '-u', 'bar', '-a', 'baz']) + config_mock().appid = 'someid' + self.assertEqual(keyman.handle_appid_selection(), None) + + @mock.patch('aws_okta_keyman.keyman.Config') + @mock.patch('aws_okta_keyman.keyman.okta') + def test_init_okta(self, okta_mock, _config_mock): + okta_mock.OktaSaml = mock.MagicMock() + keyman = Keyman(['foo', '-o', 'foo', '-u', 'bar', '-a', 'baz']) + keyman.init_okta('troz') + + okta_mock.OktaSaml.assert_has_calls([ + mock.call(mock.ANY, mock.ANY, 'troz') + ]) + + @mock.patch('aws_okta_keyman.keyman.Config') + @mock.patch('aws_okta_keyman.keyman.okta') + def test_init_okta_with_oktapreview(self, okta_mock, _config_mock): + okta_mock.OktaSaml = mock.MagicMock() + keyman = Keyman(['foo', '-o', 'foo', '-u', 'bar', '-a', 'baz']) + keyman.config.oktapreview = True + keyman.init_okta('troz') + + okta_mock.OktaSaml.assert_has_calls([ + mock.call(mock.ANY, mock.ANY, 'troz', oktapreview=True) + ]) + + @mock.patch('aws_okta_keyman.keyman.Config') + @mock.patch('aws_okta_keyman.keyman.okta') + def test_init_okta_with_empty_input(self, okta_mock, _config_mock): + okta_mock.EmptyInput = BaseException + okta_mock.OktaSaml = mock.MagicMock() + okta_mock.OktaSaml.side_effect = okta.EmptyInput + keyman = Keyman(['foo', '-o', 'foo', '-u', 'bar', '-a', 'baz']) + with self.assertRaises(SystemExit): + keyman.init_okta('troz') + + @mock.patch('aws_okta_keyman.keyman.Config') + def test_auth_okta(self, _config_mock): + keyman = Keyman(['foo', '-o', 'foo', '-u', 'bar', '-a', 'baz']) + keyman.okta_client = mock.MagicMock() + keyman.okta_client.auth.return_value = None + + ret = keyman.auth_okta() + self.assertEqual(ret, None) + + @mock.patch('aws_okta_keyman.keyman.Config') + def test_auth_okta_bad_password(self, _config_mock): + keyman = Keyman(['foo', '-o', 'foo', '-u', 'bar', '-a', 'baz']) + keyman.okta_client = mock.MagicMock() + keyman.okta_client.auth.side_effect = okta.InvalidPassword + + with self.assertRaises(SystemExit): + keyman.auth_okta() + + @mock.patch('aws_okta_keyman.keyman.Config') + def test_auth_okta_mfa(self, _config_mock): + keyman = Keyman(['foo', '-o', 'foo', '-u', 'bar', '-a', 'baz']) + keyman.okta_client = mock.MagicMock() + keyman.okta_client.auth.side_effect = okta.PasscodeRequired('a', 'b', + 'c') + keyman.okta_client.validate_mfa.return_value = True + keyman.user_input = mock.MagicMock() + keyman.user_input.return_value = "000000" + + keyman.auth_okta() + + keyman.okta_client.validate_mfa.assert_has_calls([ + mock.call('a', 'b', "000000") + ]) + + @mock.patch('aws_okta_keyman.keyman.Config') + def test_auth_okta_unknown_error(self, _config_mock): + keyman = Keyman(['foo', '-o', 'foo', '-u', 'bar', '-a', 'baz']) + keyman.okta_client = mock.MagicMock() + keyman.okta_client.auth.side_effect = okta.UnknownError + + with self.assertRaises(SystemExit): + keyman.auth_okta() + + @mock.patch('aws_okta_keyman.keyman.Config') + def test_handle_multiple_roles(self, _config_mock): + keyman = Keyman(['foo', '-o', 'foo', '-u', 'bar', '-a', 'baz']) + keyman.selector_menu = mock.MagicMock() + keyman.selector_menu.return_value = 0 + roles = [{}, {}] + mock_session = mock.MagicMock() + mock_session.available_roles.return_value = roles + + ret = keyman.handle_multiple_roles(mock_session) + + self.assertEqual(ret, 0) + + keyman.selector_menu.assert_has_calls([ + mock.call([{}, {}], 'role', 'Role') + ]) + mock_session.assert_has_calls([ + mock.call.available_roles(), + mock.call.set_role(mock.ANY), + mock.call.assume_role() + ]) + + @mock.patch('aws_okta_keyman.keyman.Config') + @mock.patch('aws_okta_keyman.keyman.aws') + def test_start_session(self, aws_mock, _config_mock): + keyman = Keyman(['foo', '-o', 'foo', '-u', 'bar', '-a', 'baz']) + keyman.okta_client = mock.MagicMock() + keyman.okta_client.get_assertion.return_value = 'assertion' + aws_mock.Session = mock.MagicMock() + + keyman.start_session() + + keyman.okta_client.assert_has_calls([ + mock.call.get_assertion(appid=mock.ANY, apptype='amazon_aws') + ]) + aws_mock.assert_has_calls([ + mock.call.Session('assertion', profile=mock.ANY) + ]) + + @mock.patch('aws_okta_keyman.keyman.Config') + def test_aws_auth_loop(self, config_mock): + config_mock().reup = False + keyman = Keyman(['foo', '-o', 'foo', '-u', 'bar', '-a', 'baz']) + keyman.start_session = mock.MagicMock() + + keyman.aws_auth_loop() + + keyman.start_session.assert_has_calls([ + mock.call(), + mock.call().assume_role() + ]) + + @mock.patch('aws_okta_keyman.keyman.Config') + def test_aws_auth_loop_multirole(self, config_mock): + config_mock().reup = False + keyman = Keyman(['foo', '-o', 'foo', '-u', 'bar', '-a', 'baz']) + keyman.start_session = mock.MagicMock() + keyman.start_session().assume_role.side_effect = aws.MultipleRoles + keyman.handle_multiple_roles = mock.MagicMock() + + keyman.aws_auth_loop() + + keyman.handle_multiple_roles.assert_has_calls([ + mock.call(mock.ANY) + ]) diff --git a/aws_okta_keyman/test/main_test.py b/aws_okta_keyman/test/main_test.py index 0827013..3c59a66 100644 --- a/aws_okta_keyman/test/main_test.py +++ b/aws_okta_keyman/test/main_test.py @@ -1,10 +1,10 @@ from __future__ import unicode_literals -import unittest -import logging + import sys +import unittest + from aws_okta_keyman import main -from aws_okta_keyman import aws -from aws_okta_keyman import okta + if sys.version_info[0] < 3: # Python 2 import mock else: @@ -13,226 +13,13 @@ class MainTest(unittest.TestCase): - def test_setup_logger(self): - # Simple execution test - make sure that the logger code executes and - # returns a root logger. No mocks used here, want to ensure that the - # options passed to the logger are valid. - ret = main.setup_logging() - self.assertEquals(type(ret), type(logging.getLogger())) - - @mock.patch('aws_okta_keyman.aws.Session') - @mock.patch('aws_okta_keyman.okta.OktaSaml') - @mock.patch('aws_okta_keyman.main.Config') - @mock.patch('getpass.getpass') - def test_entry_point(self, pass_mock, config_mock, okta_mock, aws_mock): - # Mock out the password getter and return a simple password - pass_mock.return_value = 'test_password' - - # Just mock out the entire Okta object, we won't really instantiate it - okta_mock.return_value = mock.MagicMock() - aws_mock.return_value = mock.MagicMock() - - # Mock out the arguments that were passed in - fake_parser = mock.MagicMock(name='fake_parser') - fake_parser.org = 'server' - fake_parser.username = 'username' - fake_parser.debug = True - fake_parser.reup = 0 - config_mock.return_value = fake_parser - - main.main('test') - - okta_mock.assert_called_with('server', 'username', 'test_password') - - @mock.patch('aws_okta_keyman.aws.Session') - @mock.patch('aws_okta_keyman.okta.OktaSaml') - @mock.patch('aws_okta_keyman.main.Config') - @mock.patch('getpass.getpass') - def test_entry_point_with_preview(self, pass_mock, config_mock, okta_mock, - aws_mock): - # Mock out the password getter and return a simple password - pass_mock.return_value = 'test_password' - - # Just mock out the entire Okta object, we won't really instantiate it - okta_mock.return_value = mock.MagicMock() - aws_mock.return_value = mock.MagicMock() - - # Mock out the arguments that were passed in - fake_parser = mock.MagicMock(name='fake_parser') - fake_parser.org = 'server' - fake_parser.username = 'username' - fake_parser.debug = True - fake_parser.reup = 0 - fake_parser.oktapreview = True - config_mock.return_value = fake_parser - - main.main('test') - - okta_mock.assert_called_with('server', 'username', 'test_password', - oktapreview=True) - - @mock.patch('aws_okta_keyman.main.user_input') - @mock.patch('aws_okta_keyman.aws.Session') - @mock.patch('aws_okta_keyman.okta.OktaSaml') - @mock.patch('aws_okta_keyman.config.Config.get_config') - @mock.patch('getpass.getpass') - def test_entry_point_mfa(self, pass_mock, config_mock, - okta_mock, aws_mock, input_mock): - # First call to this is the password. Second call is the mis-typed - # passcode. Third call is a valid passcode. - pass_mock.side_effect = ['test_password'] - input_mock.side_effect = ['123', '123456'] - - # Just mock out the entire Okta object, we won't really instantiate it - fake_okta = mock.MagicMock(name='OktaSaml') - okta_mock.return_value = fake_okta - aws_mock.return_value = mock.MagicMock() - - # Make sure we don't get stuck in a loop, always have to mock out the - # reup option. - fake_parser = mock.MagicMock(name='fake_parser') - fake_parser.reup = 0 - config_mock.return_value = fake_parser - - # Now, when we auth() throw a okta.PasscodeRequired exception to - # trigger the MFA requirement. Note, this is only the manually entered - # in passcode MFA req. OktaSaml client automatically handles Okta - # Verify with Push MFA reqs. - fake_okta.auth.side_effect = okta.PasscodeRequired( - fid='test_factor_id', - state_token='test_token') - - # Pretend that the validate_mfa() call fails the first time, and - # succeeds the second time. This simulates a typo on the MFA code. - fake_okta.validate_mfa.side_effect = [False, True] - - main.main('test') - - # Ensure that getpass was called once for the password - pass_mock.assert_has_calls([ - mock.call(), - ]) - - # Ensure that we called auth, then called validate_mfa() twice - each - # with different passcodes. Validating that the user was indeed asked - # for a passcode on each iteration. - fake_okta.assert_has_calls([ - mock.call.auth(), - mock.call.validate_mfa('test_factor_id', 'test_token', '123'), - mock.call.validate_mfa('test_factor_id', 'test_token', '123456'), - ]) - - # Ensure that user_input was called twice; once for the bad input and - # once for the retry - input_mock.assert_has_calls([ - mock.call('MFA Passcode: '), - mock.call('MFA Passcode: '), - ]) - - @mock.patch('aws_okta_keyman.main.user_input') - @mock.patch('aws_okta_keyman.main.aws.Session') - @mock.patch('aws_okta_keyman.okta.OktaSaml') - @mock.patch('aws_okta_keyman.config.Config.get_config') - @mock.patch('getpass.getpass') - def test_entry_point_multirole(self, pass_mock, config_mock, - okta_mock, aws_mock, input_mock): - # First call to this is the password. Second call is the mis-typed - # passcode. Third call is a valid passcode. - pass_mock.side_effect = ['test_password'] - input_mock.side_effect = '0' - - # Just mock out the entire Okta object, we won't really instantiate it - fake_okta = mock.MagicMock(name='OktaSaml') - okta_mock.return_value = fake_okta - aws_mock.return_value = mock.MagicMock(name='aws_mock') - - # Throw MultipleRoles to validate actions when there are multiple roles - mocked_session = aws_mock.return_value - mocked_session.assume_role.side_effect = [aws.MultipleRoles(), None] - - # Return multiple roles - mocked_session.available_roles = mock.Mock() - roles = [{'role': '1', 'principle': ''}, - {'role': '2', 'principle': ''}] - mocked_session.available_roles.return_value = roles - - # Make sure we don't get stuck in a loop, always have to mock out the - # reup option. - fake_parser = mock.MagicMock(name='fake_parser') - fake_parser.reup = 0 - config_mock.return_value = fake_parser - - main.main('test') - - # Ensure that getpass was called once for the password - pass_mock.assert_has_calls([ - mock.call(), - ]) - - # Ensure that user_input was called for the role selection - input_mock.assert_has_calls([ - mock.call('Select a role from above: '), - ]) - - @mock.patch('aws_okta_keyman.okta.OktaSaml') - @mock.patch('aws_okta_keyman.config.Config.get_config') - @mock.patch('getpass.getpass') - def test_entry_point_bad_password(self, pass_mock, config_mock, okta_mock): - # Mock out the password getter and return a simple password - pass_mock.return_value = 'test_password' - - # Just mock out the entire Okta object, we won't really instantiate it - fake_okta = mock.MagicMock(name='fake_okta') - fake_okta.auth.side_effect = okta.InvalidPassword - okta_mock.return_value = fake_okta - - # Mock out the arguments that were passed in - fake_parser = mock.MagicMock(name='fake_parser') - config_mock.return_value = fake_parser - - with self.assertRaises(SystemExit): - main.main('test') - - @mock.patch('aws_okta_keyman.okta.OktaSaml') - @mock.patch('aws_okta_keyman.config.Config.get_config') - @mock.patch('getpass.getpass') - def test_entry_point_okta_unknown(self, pass_mock, config_mock, okta_mock): - # Mock out the password getter and return a simple password - pass_mock.return_value = 'test_password' - - # Just mock out the entire Okta object, we won't really instantiate it - fake_okta = mock.MagicMock(name='fake_okta') - fake_okta.auth.side_effect = okta.UnknownError - okta_mock.return_value = fake_okta - - # Mock out the arguments that were passed in - fake_parser = mock.MagicMock(name='fake_parser') - config_mock.return_value = fake_parser - - with self.assertRaises(SystemExit): - main.main('test') - - @mock.patch('aws_okta_keyman.okta.OktaSaml') - @mock.patch('aws_okta_keyman.config.Config.get_config') - @mock.patch('getpass.getpass') - def test_entry_point_bad_input(self, pass_mock, config_mock, okta_mock): - # Pretend that we got some bad input... - pass_mock.return_value = '' - okta_mock.side_effect = okta.EmptyInput - - # Mock out the arguments that were passed in - fake_parser = mock.MagicMock(name='fake_parser') - config_mock.return_value = fake_parser - - with self.assertRaises(SystemExit): - main.main('test') - - @mock.patch('aws_okta_keyman.main.input') - def test_input(self, mock_input): - mock_input.return_value = 'test' - self.assertEqual('test', main.user_input('input test')) - - @mock.patch('aws_okta_keyman.main.main') - def test_entry_point_func(self, main_mock): + @mock.patch('aws_okta_keyman.main.Keyman') + def test_entry_point_func(self, keyman_mock): + keyman_mock.main.return_value = None with self.assertRaises(SystemExit): main.entry_point() + + keyman_mock.assert_has_calls([ + mock.call(mock.ANY), + mock.call().main() + ]) diff --git a/aws_okta_keyman/test/okta_test.py b/aws_okta_keyman/test/okta_test.py index d997d29..67b17b4 100644 --- a/aws_okta_keyman/test/okta_test.py +++ b/aws_okta_keyman/test/okta_test.py @@ -1,8 +1,12 @@ from __future__ import unicode_literals + +import sys import unittest + import requests -import sys + from aws_okta_keyman import okta + if sys.version_info[0] < 3: # Python 2 import mock else: @@ -19,8 +23,8 @@ 'locale': 'en', 'lastName': 'Foo', 'login': 'bob@foobar.com', - 'firstName': 'Bob', 'timeZone': - 'America/Los_Angeles'}, + 'firstName': 'Bob', + 'timeZone': 'America/Los_Angeles'}, 'id': 'XXXIDXXX' } }, @@ -136,6 +140,14 @@ def test_init_blank_inputs(self): with self.assertRaises(okta.EmptyInput): okta.Okta(organization=None, username='test', password='test') + def test_init_args_values(self): + client = okta.Okta(organization='foo', username='bar', password='baz', + oktapreview=True) + + self.assertEquals(client.base_url, 'https://foo.oktapreview.com') + self.assertEquals(client.username, 'bar') + self.assertEquals(client.password, 'baz') + def test_request_good_response(self): client = okta.Okta('organization', 'username', 'password') client.session = mock.MagicMock(name='session') @@ -215,9 +227,9 @@ def test_validate_mfa_too_short(self): def test_validate_mfa_invalid_token(self): client = okta.Okta('organization', 'username', 'password') - client._request = mock.MagicMock(name='_request') resp = requests.Response() resp.status_code = 403 + client._request = mock.MagicMock(name='_request') client._request.side_effect = requests.exceptions.HTTPError( response=resp) @@ -232,10 +244,10 @@ def test_validate_mfa_invalid_token(self): def test_validate_mfa_unknown_error(self): client = okta.Okta('organization', 'username', 'password') - client._request = mock.MagicMock(name='_request') resp = requests.Response() resp.status_code = 500 resp.body = 'Something bad happened' + client._request = mock.MagicMock(name='_request') client._request.side_effect = requests.exceptions.HTTPError( response=resp) @@ -250,190 +262,225 @@ def test_validate_mfa(self): self.assertEquals(ret, True) self.assertEquals(client.session_token, 'XXXTOKENXXX') - def test_okta_verify_with_push(self): + def test_okta_verify(self): client = okta.Okta('organization', 'username', 'password') client._request = mock.MagicMock(name='_request') + client.mfa_wait_loop = mock.MagicMock(name='mfa_wait_loop') - client._request.side_effect = [ - MFA_WAITING_RESPONSE, - MFA_WAITING_RESPONSE, - MFA_WAITING_RESPONSE, - SUCCESS_RESPONSE, - ] + client._request.return_value = MFA_WAITING_RESPONSE - ret = client.okta_verify_with_push('123', 'token', sleep=0) + ret = client.okta_verify('123', 'token') self.assertEquals(ret, True) + client.mfa_wait_loop.assert_called_with(MFA_WAITING_RESPONSE, + {'fid': '123', + 'stateToken': 'token'}) - def test_okta_verify_with_push_rejected(self): + def test_okta_verify_failure(self): client = okta.Okta('organization', 'username', 'password') client._request = mock.MagicMock(name='_request') + client._request.return_value = MFA_WAITING_RESPONSE + client.mfa_wait_loop = mock.MagicMock(name='mfa_wait_loop') + client.mfa_wait_loop.return_value = None - client._request.side_effect = [ - MFA_WAITING_RESPONSE, - MFA_WAITING_RESPONSE, - MFA_WAITING_RESPONSE, - MFA_REJECTED_RESPONSE, - ] - - ret = client.okta_verify_with_push('123', 'token', sleep=0) - self.assertEquals(ret, False) - - def test_okta_verify_with_push_timeout(self): - client = okta.Okta('organization', 'username', 'password') - client._request = mock.MagicMock(name='_request') - - client._request.side_effect = [ - MFA_WAITING_RESPONSE, - MFA_WAITING_RESPONSE, - MFA_WAITING_RESPONSE, - MFA_TIMEOUT_RESPONSE, - ] - - ret = client.okta_verify_with_push('123', 'token', sleep=0) - self.assertEquals(ret, False) + ret = client.okta_verify('123', 'token') + self.assertEquals(ret, None) @mock.patch('time.sleep', return_value=None) @mock.patch('webbrowser.open_new') @mock.patch('aws_okta_keyman.okta.Process') - def test_duo_auth(self, process_mock, _web_mock, _sleep_mock): + @mock.patch('aws_okta_keyman.okta.Duo') + def test_duo_auth(self, duo_mock, process_mock, _web_mock, _sleep_mock): client = okta.Okta('organization', 'username', 'password') client._request = mock.MagicMock(name='_request') - + client._request.return_value = MFA_WAITING_RESPONSE process_mock.start.return_value = None + client.mfa_wait_loop = mock.MagicMock(name='mfa_wait_loop') - client._request.side_effect = [ - MFA_WAITING_RESPONSE, - MFA_WAITING_RESPONSE, - MFA_WAITING_RESPONSE, - SUCCESS_RESPONSE, - ] - - ret = client.duo_auth('user', '123', 'token', sleep=0) + ret = client.duo_auth('123', 'token') self.assertEquals(ret, True) + client.mfa_wait_loop.assert_called_with(MFA_WAITING_RESPONSE, + {'fid': '123', + 'stateToken': 'token'}) @mock.patch('time.sleep', return_value=None) @mock.patch('webbrowser.open_new') @mock.patch('aws_okta_keyman.okta.Process') - def test_duo_auth_rejected(self, process_mock, _web_mock, _sleep_mock): + @mock.patch('aws_okta_keyman.okta.Duo') + def test_duo_auth_failure(self, duo_mock, process_mock, _web_mock, + _sleep_mock): client = okta.Okta('organization', 'username', 'password') client._request = mock.MagicMock(name='_request') - + client._request.return_value = MFA_WAITING_RESPONSE process_mock.start.return_value = None + client.mfa_wait_loop = mock.MagicMock(name='mfa_wait_loop') + client.mfa_wait_loop.return_value = None - client._request.side_effect = [ - MFA_WAITING_RESPONSE, - MFA_WAITING_RESPONSE, - MFA_WAITING_RESPONSE, - MFA_REJECTED_RESPONSE, - ] - - ret = client.duo_auth('user', '123', 'token', sleep=0) - self.assertEquals(ret, False) + ret = client.duo_auth('123', 'token') + self.assertEquals(ret, None) - @mock.patch('time.sleep', return_value=None) - @mock.patch('webbrowser.open_new') - @mock.patch('aws_okta_keyman.okta.Process') - def test_duo_auth_timeout(self, process_mock, _web_mock, _sleep_mock): + def test_auth(self): client = okta.Okta('organization', 'username', 'password') client._request = mock.MagicMock(name='_request') + client._request.side_effect = [SUCCESS_RESPONSE] - process_mock.start.return_value = None + ret = client.auth() + self.assertEquals(ret, None) - client._request.side_effect = [ - MFA_WAITING_RESPONSE, - MFA_WAITING_RESPONSE, - MFA_WAITING_RESPONSE, - MFA_TIMEOUT_RESPONSE, - ] + def test_auth_mfa_challenge(self): + client = okta.Okta('organization', 'username', 'password') + client._request = mock.MagicMock(name='_request') + client._request.side_effect = [MFA_CHALLENGE_RESPONSE_OKTA_VERIFY] + client.handle_mfa_response = mock.MagicMock(name='handle_mfa_response') + client.handle_mfa_response.return_value = None - ret = client.duo_auth('user', '123', 'token', sleep=0) - self.assertEquals(ret, False) + ret = client.auth() + self.assertEquals(ret, None) def test_auth_bad_password(self): client = okta.Okta('organization', 'username', 'password') - client._request = mock.MagicMock(name='_request') - resp = requests.Response() resp.status_code = 401 resp.body = 'Bad Password' + client._request = mock.MagicMock(name='_request') client._request.side_effect = requests.exceptions.HTTPError( response=resp) with self.assertRaises(okta.InvalidPassword): client.auth() - def test_auth(self): + def test_auth_with_unexpected_response(self): client = okta.Okta('organization', 'username', 'password') client._request = mock.MagicMock(name='_request') + client._request.side_effect = [{}] - client._request.side_effect = [SUCCESS_RESPONSE] - - ret = client.auth() - self.assertEquals(ret, None) + with self.assertRaises(okta.UnknownError): + client.auth() def test_auth_requires_mfa_enroll(self): client = okta.Okta('organization', 'username', 'password') client._request = mock.MagicMock(name='_request') - client._request.side_effect = [MFA_ENROLL_RESPONSE] with self.assertRaises(okta.UnknownError): client.auth() - def test_auth_trigger_okta_verify(self): + def test_handle_mfa_response_trigger_okta_verify(self): client = okta.Okta('organization', 'username', 'password') - client._request = mock.MagicMock(name='_request') - client.okta_verify_with_push = mock.MagicMock( - name='okta_verify_with_push') + client.okta_verify = mock.MagicMock( + name='okta_verify') - client._request.side_effect = [MFA_CHALLENGE_RESPONSE_OKTA_VERIFY] + ret = client.handle_mfa_response(MFA_CHALLENGE_RESPONSE_OKTA_VERIFY) - ret = client.auth() - self.assertEquals(ret, None) - client.okta_verify_with_push.assert_has_calls([ + self.assertEquals(ret, True) + client.okta_verify.assert_has_calls([ mock.call('abcd', 'token') ]) - def test_auth_trigger_okta_verify_canceled(self): + def test_handle_mfa_response_trigger_okta_verify_canceled(self): client = okta.Okta('organization', 'username', 'password') - client._request = mock.MagicMock(name='_request') - client.okta_verify_with_push = mock.MagicMock( - name='okta_verify_with_push') - client.okta_verify_with_push.side_effect = KeyboardInterrupt - - client._request.side_effect = [MFA_CHALLENGE_RESPONSE_OKTA_VERIFY] + client.okta_verify = mock.MagicMock( + name='okta_verify') + client.okta_verify.return_value = None with self.assertRaises(okta.UnknownError): - client.auth() + client.handle_mfa_response(MFA_CHALLENGE_RESPONSE_OKTA_VERIFY) - def test_auth_trigger_duo_auth(self): + def test_handle_mfa_response_trigger_duo_auth(self): client = okta.Okta('organization', 'username', 'password') - client._request = mock.MagicMock(name='_request') - client.duo_auth = mock.MagicMock( - name='duo_auth') - - client._request.side_effect = [MFA_CHALLENGE_RESPONSE_DUO_AUTH] + client.duo_auth = mock.MagicMock(name='duo_auth') + client.duo_auth.return_value = True - ret = client.auth() - self.assertEquals(ret, None) + ret = client.handle_mfa_response(MFA_CHALLENGE_RESPONSE_DUO_AUTH) + self.assertEquals(ret, True) client.duo_auth.assert_has_calls([ - mock.call(123, 'abcd', 'token') + mock.call('abcd', 'token') ]) - def test_auth_throws_passcode_required(self): + def test_handle_mfa_response_throws_passcode_required(self): + client = okta.Okta('organization', 'username', 'password') + + with self.assertRaises(okta.PasscodeRequired): + client.handle_mfa_response(MFA_CHALLENGE_RESPONSE_PASSCODE) + + def test_mfa_wait_loop_success(self): + client = okta.Okta('organization', 'username', 'password') + client._request = mock.MagicMock(name='_request') + client._request.side_effect = [ + MFA_WAITING_RESPONSE, + MFA_WAITING_RESPONSE, + MFA_WAITING_RESPONSE, + SUCCESS_RESPONSE, + ] + data = {'fid': '123', 'stateToken': 'token'} + + ret = client.mfa_wait_loop(MFA_WAITING_RESPONSE, data, sleep=0) + expected = { + '_embedded': { + 'user': { + 'id': 'XXXIDXXX', + 'profile': { + 'firstName': 'Bob', + 'lastName': 'Foo', + 'locale': 'en', + 'login': 'bob@foobar.com', + 'timeZone': 'America/Los_Angeles' + } + } + }, + 'expiresAt': mock.ANY, + 'sessionToken': 'XXXTOKENXXX', + 'status': 'SUCCESS'} + self.assertEquals(ret, expected) + + def test_mfa_wait_loop_rejected(self): client = okta.Okta('organization', 'username', 'password') client._request = mock.MagicMock(name='_request') + client._request.side_effect = [ + MFA_WAITING_RESPONSE, + MFA_WAITING_RESPONSE, + MFA_WAITING_RESPONSE, + MFA_REJECTED_RESPONSE, + ] + data = {'fid': '123', 'stateToken': 'token'} - client._request.side_effect = [MFA_CHALLENGE_RESPONSE_PASSCODE] + ret = client.mfa_wait_loop(MFA_WAITING_RESPONSE, data, sleep=0) + self.assertEquals(ret, None) - with self.assertRaises(okta.PasscodeRequired): - client.auth() + def test_mfa_wait_loop_timeout(self): + client = okta.Okta('organization', 'username', 'password') + client._request = mock.MagicMock(name='_request') + client._request.side_effect = [ + MFA_WAITING_RESPONSE, + KeyboardInterrupt + ] + data = {'fid': '123', 'stateToken': 'token'} - def test_auth_with_unexpected_response(self): + ret = client.mfa_wait_loop(MFA_WAITING_RESPONSE, data, sleep=0) + self.assertEquals(ret, None) + + def test_mfa_wait_loop_user_cancel(self): client = okta.Okta('organization', 'username', 'password') client._request = mock.MagicMock(name='_request') + client._request.side_effect = [ + MFA_WAITING_RESPONSE, + MFA_WAITING_RESPONSE, + MFA_WAITING_RESPONSE, + MFA_TIMEOUT_RESPONSE, + ] + data = {'fid': '123', 'stateToken': 'token'} - client._request.side_effect = [{}] + ret = client.mfa_wait_loop(MFA_WAITING_RESPONSE, data, sleep=0) + self.assertEquals(ret, None) - with self.assertRaises(okta.UnknownError): - client.auth() + +class PasscodeRequiredTest(unittest.TestCase): + def test_class_properties(self): + error_response = None + try: + raise okta.PasscodeRequired('fid', 'state_token', 'provider') + except okta.PasscodeRequired as err: + error_response = err + + self.assertEquals(error_response.fid, 'fid') + self.assertEquals(error_response.state_token, 'state_token') + self.assertEquals(error_response.provider, 'provider') diff --git a/setup.py b/setup.py index 6390b9f..33fe169 100644 --- a/setup.py +++ b/setup.py @@ -16,11 +16,9 @@ import os import sys -from setuptools import Command -from setuptools import setup -from setuptools import find_packages +from setuptools import Command, find_packages, setup -from aws_okta_keyman.metadata import __desc__, __version__, __desc_long__ +from aws_okta_keyman.metadata import __desc__, __desc_long__, __version__ PACKAGE = 'aws_okta_keyman' DIR = os.path.dirname(os.path.realpath(__file__))