From a2e10e1ce75daa73306a7e1076f124a3ad8501da Mon Sep 17 00:00:00 2001 From: Nilupul Manodya Date: Thu, 2 Nov 2023 18:55:23 +0530 Subject: [PATCH] improve accessibility saml2 urls (#2068) * improve accessibility saml2 urls * resolve comments --- mslib/mscolab/server.py | 201 +++++++++++++++++++--------------------- 1 file changed, 97 insertions(+), 104 deletions(-) diff --git a/mslib/mscolab/server.py b/mslib/mscolab/server.py index 1824b7fdc..2cb0e7c01 100644 --- a/mslib/mscolab/server.py +++ b/mslib/mscolab/server.py @@ -68,11 +68,6 @@ class mscolab_auth: ("add_new_user_here", "add_md5_digest_of_PASSWORD_here")] __file__ = None -# setup idp login config -if mscolab_settings.USE_SAML2: - setup_saml2_backend() - - # setup http auth if mscolab_settings.__dict__.get('enable_basic_http_authentication', False): logging.debug("Enabling basic HTTP authentication. Username and " @@ -204,9 +199,9 @@ def get_idp_entity_id(selected_idp): Finds the entity_id from the configured IDPs :return: the entity_id of the idp or None """ - for idp_config in setup_saml2_backend.CONFIGURED_IDPS: - if selected_idp == idp_config['idp_identity_name']: - idps = idp_config['idp_data']['saml2client'].metadata.identity_providers() + for config in setup_saml2_backend.CONFIGURED_IDPS: + if selected_idp == config['idp_identity_name']: + idps = config['idp_data']['saml2client'].metadata.identity_providers() only_idp = idps[0] entity_id = only_idp return entity_id @@ -735,76 +730,59 @@ def reset_request(): return render_template('errors/403.html'), 403 -@APP.route("/metadata/", methods=['GET']) -def metadata(idp_identity_name): - """Return the SAML metadata XML for the requested IDP""" - for idp_config in setup_saml2_backend.CONFIGURED_IDPS: - if idp_identity_name == idp_config['idp_identity_name']: - sp_config = idp_config['idp_data']['saml2client'] - metadata_string = create_metadata_string( - None, sp_config.config, 4, None, None, None, None, None - ).decode("utf-8") - return Response(metadata_string, mimetype="text/xml") - return render_template('errors/404.html'), 404 - +if mscolab_settings.USE_SAML2: + # setup idp login config + setup_saml2_backend() -@APP.route('/available_idps/', methods=['GET']) -def available_idps(): - """ - This function checks if IDP (Identity Provider) is enabled in the mscolab_settings module. - If IDP is enabled, it retrieves the configured IDPs from setup_saml2_backend.CONFIGURED_IDPS - and renders the 'idp/available_idps.html' template with the list of configured IDPs. - """ - if mscolab_settings.USE_SAML2: + # set routes for SSO + @APP.route('/available_idps/', methods=['GET']) + def available_idps(): + """ + This function checks if IDP (Identity Provider) is enabled in the mscolab_settings module. + If IDP is enabled, it retrieves the configured IDPs from setup_saml2_backend.CONFIGURED_IDPS + and renders the 'idp/available_idps.html' template with the list of configured IDPs. + """ configured_idps = setup_saml2_backend.CONFIGURED_IDPS return render_template('idp/available_idps.html', configured_idps=configured_idps), 200 - return render_template('errors/403.html'), 403 - -@APP.route("/idp_login/", methods=['POST']) -def idp_login(): - """Handle the login process for the user by selected IDP""" - selected_idp = request.form.get('selectedIdentityProvider') - sp_config = None - for idp_config in setup_saml2_backend.CONFIGURED_IDPS: - if selected_idp == idp_config['idp_identity_name']: - sp_config = idp_config['idp_data']['saml2client'] - break + @APP.route("/idp_login/", methods=['POST']) + def idp_login(): + """Handle the login process for the user by selected IDP""" + selected_idp = request.form.get('selectedIdentityProvider') + sp_config = None + for config in setup_saml2_backend.CONFIGURED_IDPS: + if selected_idp == config['idp_identity_name']: + sp_config = config['idp_data']['saml2client'] + break - try: - _, response_binding = sp_config.config.getattr("endpoints", "sp")[ - "assertion_consumer_service" - ][0] - entity_id = get_idp_entity_id(selected_idp) - _, binding, http_args = sp_config.prepare_for_negotiated_authenticate( - entityid=entity_id, - response_binding=response_binding, - ) - if binding == BINDING_HTTP_REDIRECT: - headers = dict(http_args["headers"]) - return redirect(str(headers["Location"]), code=303) - return Response(http_args["data"], headers=http_args["headers"]) - except (NameError, AttributeError): - return render_template('errors/403.html'), 403 - - -@APP.route('/', methods=['POST']) -def acs_post_handler(url): - """ - Function to handle unknown POST requests, - Implemented to Handle the SAML authentication response received via POST request from configured IDPs. - """ - try: - # implementation for handle configured saml assertion consumer endpoints - for idp_config in setup_saml2_backend.CONFIGURED_IDPS: - # Check if the requested URL exists in the assertion_consumer_endpoints dictionary - url_with_slash = '/' + url - url_exists_with_slash = url_with_slash in idp_config['idp_data']['assertion_consumer_endpoints'] - url_exists_without_slash = url in idp_config['idp_data']['assertion_consumer_endpoints'] - if url_exists_without_slash or url_exists_with_slash: + try: + _, response_binding = sp_config.config.getattr("endpoints", "sp")[ + "assertion_consumer_service" + ][0] + entity_id = get_idp_entity_id(selected_idp) + _, binding, http_args = sp_config.prepare_for_negotiated_authenticate( + entityid=entity_id, + response_binding=response_binding, + ) + if binding == BINDING_HTTP_REDIRECT: + headers = dict(http_args["headers"]) + return redirect(str(headers["Location"]), code=303) + return Response(http_args["data"], headers=http_args["headers"]) + except (NameError, AttributeError): + return render_template('errors/403.html'), 403 + + def create_acs_post_handler(config): + """ + Create acs_post_handler function for the given idp_config. + """ + def acs_post_handler(): + """ + Function to handle SAML authentication response. + """ + try: outstanding_queries = {} binding = BINDING_HTTP_POST - authn_response = idp_config['idp_data']['saml2client'].parse_authn_request_response( + authn_response = config['idp_data']['saml2client'].parse_authn_request_response( request.form["SAMLResponse"], binding, outstanding=outstanding_queries ) email = None @@ -815,7 +793,6 @@ def acs_post_handler(url): username = authn_response.ava["givenName"][0] token = generate_confirmation_token(email) except (NameError, AttributeError, KeyError): - try: # Initialize an empty dictionary to store attribute values attributes = {} @@ -832,46 +809,62 @@ def acs_post_handler(url): email = attributes["email"] username = attributes["givenName"] token = generate_confirmation_token(email) - except (NameError, AttributeError, KeyError): - render_template('errors/403.html'), 403 + return render_template('errors/403.html'), 403 if email is not None and username is not None: - idp_user_db_state = create_or_update_idp_user(email, username, token, - idp_config['idp_identity_name']) + idp_user_db_state = create_or_update_idp_user(email, + username, token, idp_config['idp_identity_name']) if idp_user_db_state: return render_template('idp/idp_login_success.html', token=token), 200 - else: - return render_template('errors/500.html'), 500 - else: return render_template('errors/500.html'), 500 - except (NameError, AttributeError, KeyError): - return render_template('errors/403.html'), 403 - + return render_template('errors/500.html'), 500 + except (NameError, AttributeError, KeyError): + return render_template('errors/403.html'), 403 + return acs_post_handler -@APP.route('/idp_login_auth/', methods=['POST']) -def idp_login_auth(): - """Handle the SAML authentication validation of client application.""" - try: - data = request.get_json() - token = data.get('token') - email = confirm_token(token, expiration=1200) - if email: - user = check_login(email, token) - if user: - random_token = secrets.token_hex(16) - user.hash_password(random_token) - db.session.add(user) - db.session.commit() - return json.dumps({ - "success": True, - 'token': random_token, - 'user': {'username': user.username, 'id': user.id, 'emailid': user.emailid} - }) + # Implementation for handling configured SAML assertion consumer endpoints + for idp_config in setup_saml2_backend.CONFIGURED_IDPS: + for assertion_consumer_endpoint in idp_config['idp_data']['assertion_consumer_endpoints']: + # Dynamically add the route for the current endpoint + APP.add_url_rule(f'/{assertion_consumer_endpoint}/', assertion_consumer_endpoint, + create_acs_post_handler(idp_config), methods=['POST']) + + @APP.route('/idp_login_auth/', methods=['POST']) + def idp_login_auth(): + """Handle the SAML authentication validation of client application.""" + try: + data = request.get_json() + token = data.get('token') + email = confirm_token(token, expiration=1200) + if email: + user = check_login(email, token) + if user: + random_token = secrets.token_hex(16) + user.hash_password(random_token) + db.session.add(user) + db.session.commit() + return json.dumps({ + "success": True, + 'token': random_token, + 'user': {'username': user.username, 'id': user.id, 'emailid': user.emailid} + }) + return jsonify({"success": False}), 401 return jsonify({"success": False}), 401 - return jsonify({"success": False}), 401 - except TypeError: - return jsonify({"success": False}), 401 + except TypeError: + return jsonify({"success": False}), 401 + + @APP.route("/metadata/", methods=['GET']) + def metadata(idp_identity_name): + """Return the SAML metadata XML for the requested IDP""" + for config in setup_saml2_backend.CONFIGURED_IDPS: + if idp_identity_name == config['idp_identity_name']: + sp_config = config['idp_data']['saml2client'] + metadata_string = create_metadata_string( + None, sp_config.config, 4, None, None, None, None, None + ).decode("utf-8") + return Response(metadata_string, mimetype="text/xml") + return render_template('errors/404.html'), 404 def start_server(app, sockio, cm, fm, port=8083):