From fa7b00bbabf33be4131b104f4e19649aa79b454e Mon Sep 17 00:00:00 2001 From: Ruhul Alam <3430752+ruhulio@users.noreply.github.com> Date: Mon, 13 Nov 2023 21:24:46 -0600 Subject: [PATCH] feat: Add support for step up authentication (#140) * feat: Add support for step up authentication When "Extra Verification" is required in authenticate_to_roles, make an additional request to '/api/v1/authn' using the current state token. Use '/login/sessionCookieRedirect' in authenticate_to_roles to ensure the the 'HTTP_client' cookies have the values needed for step up authentication and also the device token. Add optional support for device token across session so that step up authentication does not require multiple MFA requests. Add device token options to 'config.py'. Add getting and setting the device token to 'http_client.py'. Add storing the device token to the ini file to 'user.py'. Add the device token options to 'docs/README.md'. * fix: Address linting issues * chore: Revert version change * fix: Clear out MFA response after using it --- docs/README.md | 4 ++- tests/unit/test_aws.py | 8 +++++- tests/unit/test_http_client.py | 19 +++++++++++++ tests/unit/test_okta.py | 49 ++++++++++++++++++++++++++++++++++ tests/unit/test_user.py | 20 ++++++++++++++ tokendito/aws.py | 15 +++++++++-- tokendito/config.py | 2 ++ tokendito/http_client.py | 20 ++++++++++++++ tokendito/okta.py | 34 ++++++++++++++++++++++- tokendito/tool.py | 18 ++++++++++++- tokendito/user.py | 33 ++++++++++++++++++++++- 11 files changed, 215 insertions(+), 7 deletions(-) diff --git a/docs/README.md b/docs/README.md index 9788f582..8618145e 100644 --- a/docs/README.md +++ b/docs/README.md @@ -74,7 +74,7 @@ tokendito --profile engineer usage: tokendito [-h] [--version] [--configure] [--username OKTA_USERNAME] [--password OKTA_PASSWORD] [--profile USER_CONFIG_PROFILE] [--config-file USER_CONFIG_FILE] [--loglevel {DEBUG,INFO,WARN,ERROR}] [--log-output-file USER_LOG_OUTPUT_FILE] [--aws-config-file AWS_CONFIG_FILE] [--aws-output AWS_OUTPUT] [--aws-profile AWS_PROFILE] [--aws-region AWS_REGION] [--aws-role-arn AWS_ROLE_ARN] [--aws-shared-credentials-file AWS_SHARED_CREDENTIALS_FILE] - [--okta-org OKTA_ORG | --okta-tile OKTA_TILE] [--okta-mfa OKTA_MFA] [--okta-mfa-response OKTA_MFA_RESPONSE] [--quiet] + [--okta-org OKTA_ORG | --okta-tile OKTA_TILE] [--okta-mfa OKTA_MFA] [--okta-mfa-response OKTA_MFA_RESPONSE] [--use-device-token] [--quiet] Gets an STS token to use with the AWS CLI and SDK. @@ -112,6 +112,7 @@ options: --okta-mfa OKTA_MFA Sets the MFA method --okta-mfa-response OKTA_MFA_RESPONSE Sets the MFA response to a challenge + --use-device-token Use device token across sessions --quiet Suppress output ``` @@ -153,6 +154,7 @@ The following table lists the environment variable and user configuration entry | `--okta-tile` | `TOKENDITO_OKTA_TILE` | `okta_tile` | | `--okta-mfa` | `TOKENDITO_OKTA_MFA` | `okta_mfa` | | `--okta-mfa-response` | `TOKENDITO_OKTA_MFA_RESPONSE` | `okta_mfa_response` | +| `--use-device-token` | `TOKENDITO_USER_USE_DEVICE_TOKEN` | `user_use_device_token` | | `--quiet` | `TOKENDITO_USER_QUIET` | `quiet` | # Configuration file location diff --git a/tests/unit/test_aws.py b/tests/unit/test_aws.py index 6f214c0b..25c6a426 100644 --- a/tests/unit/test_aws.py +++ b/tests/unit/test_aws.py @@ -94,6 +94,7 @@ def test_select_assumeable_role_no_tiles(): def test_authenticate_to_roles(status_code, monkeypatch): """Test if function return correct response.""" from tokendito.aws import authenticate_to_roles + from tokendito.config import Config import tokendito.http_client as http_client # Create a mock response object @@ -104,7 +105,12 @@ def test_authenticate_to_roles(status_code, monkeypatch): # Use monkeypatch to replace the HTTP_client.get method with the mock monkeypatch.setattr(http_client.HTTP_client, "get", lambda *args, **kwargs: mock_response) + pytest_config = Config( + okta={ + "org": "https://acme.okta.org/", + } + ) cookies = {"some_cookie": "some_value"} with pytest.raises(SystemExit): - authenticate_to_roles([("http://test.url.com", "")], cookies) + authenticate_to_roles(pytest_config, [("http://test.url.com", "")], cookies) diff --git a/tests/unit/test_http_client.py b/tests/unit/test_http_client.py index aa13927d..46058332 100644 --- a/tests/unit/test_http_client.py +++ b/tests/unit/test_http_client.py @@ -155,3 +155,22 @@ def test_post_logging_on_exception(client, mocker): with pytest.raises(SystemExit): client.post("http://test.com", json={"key": "value"}) mock_logger.assert_called() + + +def test_get_device_token(client): + """Test getting device token from the session.""" + device_token = "test-device-token" + cookies = {"DT": device_token} + client.set_cookies(cookies) + + # Check if the device token is set correctly in the session + assert client.get_device_token() == device_token + + +def test_set_device_token(client): + """Test setting device token in the session.""" + device_token = "test-device-token" + client.set_device_token("http://test.com", device_token) + + # Check if the device token is set correctly in the session + assert client.session.cookies.get("DT") == device_token diff --git a/tests/unit/test_okta.py b/tests/unit/test_okta.py index b2769b22..fef61c32 100644 --- a/tests/unit/test_okta.py +++ b/tests/unit/test_okta.py @@ -540,6 +540,55 @@ def test_authenticate(mocker): assert okta.authenticate(pytest_config) == error +def test_step_up_authenticate(mocker): + """Test set up authenticate method.""" + from tokendito import okta + from tokendito.config import Config + from tokendito.http_client import HTTP_client + + pytest_config = Config( + okta={ + "username": "pytest", + "org": "https://acme.okta.org/", + } + ) + + state_token = "test-state-token" + + # Test missing auth type + mocker.patch("tokendito.okta.get_auth_properties", return_value={}) + assert okta.step_up_authenticate(pytest_config, state_token) is False + + # Test unsupported auth type + mocker.patch("tokendito.okta.get_auth_properties", return_value={"type": "SAML2"}) + assert okta.step_up_authenticate(pytest_config, state_token) is False + + # Test supported auth type... + mocker.patch("tokendito.okta.get_auth_properties", return_value={"type": "OKTA"}) + + # ...with SUCCESS status + mock_response_data = {"status": "SUCCESS"} + mocker.patch.object(HTTP_client, "post", return_value=mock_response_data) + + assert okta.step_up_authenticate(pytest_config, state_token) is True + + # ...with MFA_REQUIRED status + mock_response_data = {"status": "MFA_REQUIRED"} + mocker.patch.object(HTTP_client, "post", return_value=mock_response_data) + patched_mfa_challenge = mocker.patch.object( + okta, "mfa_challenge", return_value="test-session-token" + ) + + assert okta.step_up_authenticate(pytest_config, state_token) is True + assert patched_mfa_challenge.call_count == 1 + + # ...with unknown status + mock_response_data = {"status": "unknown"} + mocker.patch.object(HTTP_client, "post", return_value=mock_response_data) + + assert okta.step_up_authenticate(pytest_config, state_token) is False + + def test_local_auth(mocker): """Test local auth method.""" from tokendito import okta diff --git a/tests/unit/test_user.py b/tests/unit/test_user.py index 1e80835f..0cb823af 100644 --- a/tests/unit/test_user.py +++ b/tests/unit/test_user.py @@ -466,6 +466,26 @@ def test_update_configuration(tmpdir): assert ret.okta["mfa"] == "pytest" +def test_update_device_token(tmpdir): + """Test writing and reading device token to a configuration file.""" + from tokendito import user + from tokendito.config import Config + + path = tmpdir.mkdir("pytest").join("pytest_tokendito.ini") + + device_token = "test-device-token" + + pytest_config = Config( + okta={"device_token": device_token}, + user={"config_file": path, "config_profile": "pytest"}, + ) + + # Write out a config file via configure() and ensure it's functional + user.update_device_token(pytest_config) + ret = user.process_ini_file(path, "pytest") + assert ret.okta["device_token"] == device_token + + def test_process_ini_file(tmpdir): """Test whether ini config elements are set correctly. diff --git a/tokendito/aws.py b/tokendito/aws.py index ef5634be..9bfc66be 100644 --- a/tokendito/aws.py +++ b/tokendito/aws.py @@ -47,7 +47,7 @@ def get_output_types(): return ["json", "text", "csv", "yaml", "yaml-stream"] -def authenticate_to_roles(urls, cookies=None): +def authenticate_to_roles(config, urls, cookies=None): """Authenticate AWS user with saml. :param urls: list of tuples or tuple, with tiles info @@ -67,11 +67,22 @@ def authenticate_to_roles(urls, cookies=None): logger.info(f"Discovering roles in {tile_count} tile{plural}.") for url, label in url_list: response = HTTP_client.get(url) # Use the HTTPClient's get method + + session_url = config.okta["org"] + "/login/sessionCookieRedirect" + params = {"token": cookies.get("sessionToken"), "redirectUrl": url} + + response = HTTP_client.get(session_url, params=params) + saml_response_string = response.text saml_xml = okta.extract_saml_response(saml_response_string) if not saml_xml: - if "Extra Verification" in saml_response_string: + state_token = okta.extract_state_token(saml_response_string) + if "Extra Verification" in saml_response_string and state_token: + logger.info(f"Step-Up authentication required for {url}.") + if okta.step_up_authenticate(config, state_token): + return authenticate_to_roles(config, urls, cookies) + logger.error("Step-Up Authentication required, but not supported.") elif "App Access Locked" in saml_response_string: logger.error( diff --git a/tokendito/config.py b/tokendito/config.py index eedb3027..c06471a6 100644 --- a/tokendito/config.py +++ b/tokendito/config.py @@ -29,6 +29,7 @@ class Config(object): encoding=_default_encoding, loglevel="INFO", log_output_file="", + use_device_token=False, mask_items=[], quiet=False, ), @@ -47,6 +48,7 @@ class Config(object): mfa_response=None, tile=None, org=None, + device_token=None, ), ) diff --git a/tokendito/http_client.py b/tokendito/http_client.py index cb67acc1..bd8f1cd2 100644 --- a/tokendito/http_client.py +++ b/tokendito/http_client.py @@ -4,6 +4,7 @@ import logging import sys +from urllib.parse import urlparse import requests from tokendito import __title__ @@ -75,5 +76,24 @@ def reset(self): self.session.headers = requests.utils.default_headers() self.session.headers.update({"User-Agent": user_agent}) + def get_device_token(self): + """Get the device token from the current session cookies. + + :return: Device token or None + """ + return self.session.cookies.get("DT", None) + + def set_device_token(self, org_url, device_token): + """Set the device token in the current session cookies. + + :param org_url: The organization URL + :param device_token: The device token + :return: None + """ + if not device_token: + return + + self.session.cookies.set("DT", device_token, domain=urlparse(org_url).netloc, path="/") + HTTP_client = HTTPClient() diff --git a/tokendito/okta.py b/tokendito/okta.py index e23e5981..10e94ad5 100644 --- a/tokendito/okta.py +++ b/tokendito/okta.py @@ -258,6 +258,35 @@ def authenticate(config): return sid +def step_up_authenticate(config, state_token): + """Try to step up authenticate the user. Only supported for local auth. + + :param config: Configuration object + :param state_token: The state token + :return: True if step up authentication was successful; False otherwise + """ + auth_properties = get_auth_properties(userid=config.okta["username"], url=config.okta["org"]) + if "type" not in auth_properties or not is_local_auth(auth_properties): + return False + + headers = {"content-type": "application/json", "accept": "application/json"} + payload = {"stateToken": state_token} + + auth = HTTP_client.post( + f"{config.okta['org']}/api/v1/authn", json=payload, headers=headers, return_json=True + ) + + status = auth.get("status", None) + if status == "SUCCESS": + return True + elif status == "MFA_REQUIRED": + mfa_challenge(config, headers, auth) + return True + + logger.error("Okta auth failed: unknown status for step up authentication.") + return False + + def is_local_auth(auth_properties): """Check whether authentication happens locally. @@ -429,7 +458,7 @@ def extract_state_token(html): state_token = None pattern = re.compile(r"var stateToken = '(?P.*)';", re.MULTILINE) - script = soup.find("script", text=pattern) + script = soup.find("script", string=pattern) if type(script) is bs4.element.Tag: match = pattern.search(script.text) if match: @@ -605,6 +634,9 @@ def totp_approval(config, selected_mfa_option, headers, mfa_challenge_url, paylo user.add_sensitive_value_to_be_masked(mfa_verify["sessionToken"]) logger.debug(f"mfa_verify [{json.dumps(mfa_verify)}]") + # Clear out any MFA response since it is no longer valid + config.okta["mfa_response"] = None + return mfa_verify diff --git a/tokendito/tool.py b/tokendito/tool.py index c3477a8a..2c3bd0fa 100644 --- a/tokendito/tool.py +++ b/tokendito/tool.py @@ -38,6 +38,16 @@ def cli(args): ) sys.exit(1) + if config.user["use_device_token"]: + device_token = config.okta["device_token"] + if device_token: + HTTP_client.set_device_token(config.okta["org"], device_token) + else: + logger.warning( + f"Device token unavailable for config profile {args.user_config_profile}. " + "May see multiple MFA requests this time." + ) + # Authenticate to okta session_cookies = okta.authenticate(config) @@ -50,7 +60,7 @@ def cli(args): config.okta["tile"] = user.discover_tiles(config.okta["org"]) # Authenticate to AWS roles - auth_tiles = aws.authenticate_to_roles(config.okta["tile"], cookies=session_cookies) + auth_tiles = aws.authenticate_to_roles(config, config.okta["tile"], cookies=session_cookies) (role_response, role_name) = aws.select_assumeable_role(auth_tiles) @@ -70,4 +80,10 @@ def cli(args): output=config.aws["output"], ) + device_token = HTTP_client.get_device_token() + if config.user["use_device_token"] and device_token: + logger.info(f"Saving device token to config profile {args.user_config_profile}") + config.okta["device_token"] = device_token + user.update_device_token(config) + user.display_selected_role(profile_name=config.aws["profile"], role_response=role_response) diff --git a/tokendito/user.py b/tokendito/user.py index 955964c6..fd1cfc9b 100644 --- a/tokendito/user.py +++ b/tokendito/user.py @@ -144,6 +144,13 @@ def parse_cli_args(args): help="Sets the MFA response to a challenge. You " "can also use the TOKENDITO_OKTA_MFA_RESPONSE environment variable.", ) + parser.add_argument( + "--use-device-token", + dest="user_use_device_token", + action="store_true", + default=False, + help="Use device token across sessions", + ) parser.add_argument( "--quiet", dest="user_quiet", @@ -911,6 +918,27 @@ def update_configuration(config): logger.info(f"Updated {ini_file} with profile {profile}") +def update_device_token(config): + """Update configuration file on local system with device token. + + :param config: the current configuration + :return: None + """ + logger.debug("Update configuration file on local system with device token.") + ini_file = config.user["config_file"] + profile = config.user["config_profile"] + + contents = {} + # Copy relevant parts of the configuration into an dictionary that + # will be written out to disk + if "device_token" in config.okta and config.okta["device_token"] is not None: + contents["okta_device_token"] = config.okta["device_token"] + + logger.debug(f"Adding {contents} to config file.") + update_ini(profile=profile, ini_file=ini_file, **contents) + logger.info(f"Updated {ini_file} with profile {profile}") + + def set_local_credentials(response={}, role="default", region="us-east-1", output="json"): """Write to local files to insert credentials. @@ -1227,8 +1255,11 @@ def request_cookies(url, session_token): add_sensitive_value_to_be_masked(sess_id) # create cookies with sid 'sid'. + domain = urlparse(url).netloc + cookies = requests.cookies.RequestsCookieJar() - cookies.set("sid", sess_id, domain=urlparse(url).netloc, path="/") + cookies.set("sid", sess_id, domain=domain, path="/") + cookies.set("sessionToken", session_token, domain=domain, path="/") # Log the session cookies. logger.debug(f"Received session cookies: {cookies}")