Skip to content

Commit

Permalink
improve accessibility saml2 urls (#2068)
Browse files Browse the repository at this point in the history
* improve accessibility saml2 urls

* resolve comments
  • Loading branch information
nilupulmanodya authored Nov 2, 2023
1 parent 30a04ea commit a2e10e1
Showing 1 changed file with 97 additions and 104 deletions.
201 changes: 97 additions & 104 deletions mslib/mscolab/server.py
Original file line number Diff line number Diff line change
Expand Up @@ -68,11 +68,6 @@ class mscolab_auth:
("add_new_user_here", "add_md5_digest_of_PASSWORD_here")]
__file__ = None

# setup idp login config
if mscolab_settings.USE_SAML2:
setup_saml2_backend()


# setup http auth
if mscolab_settings.__dict__.get('enable_basic_http_authentication', False):
logging.debug("Enabling basic HTTP authentication. Username and "
Expand Down Expand Up @@ -204,9 +199,9 @@ def get_idp_entity_id(selected_idp):
Finds the entity_id from the configured IDPs
:return: the entity_id of the idp or None
"""
for idp_config in setup_saml2_backend.CONFIGURED_IDPS:
if selected_idp == idp_config['idp_identity_name']:
idps = idp_config['idp_data']['saml2client'].metadata.identity_providers()
for config in setup_saml2_backend.CONFIGURED_IDPS:
if selected_idp == config['idp_identity_name']:
idps = config['idp_data']['saml2client'].metadata.identity_providers()
only_idp = idps[0]
entity_id = only_idp
return entity_id
Expand Down Expand Up @@ -735,76 +730,59 @@ def reset_request():
return render_template('errors/403.html'), 403


@APP.route("/metadata/<idp_identity_name>", methods=['GET'])
def metadata(idp_identity_name):
"""Return the SAML metadata XML for the requested IDP"""
for idp_config in setup_saml2_backend.CONFIGURED_IDPS:
if idp_identity_name == idp_config['idp_identity_name']:
sp_config = idp_config['idp_data']['saml2client']
metadata_string = create_metadata_string(
None, sp_config.config, 4, None, None, None, None, None
).decode("utf-8")
return Response(metadata_string, mimetype="text/xml")
return render_template('errors/404.html'), 404

if mscolab_settings.USE_SAML2:
# setup idp login config
setup_saml2_backend()

@APP.route('/available_idps/', methods=['GET'])
def available_idps():
"""
This function checks if IDP (Identity Provider) is enabled in the mscolab_settings module.
If IDP is enabled, it retrieves the configured IDPs from setup_saml2_backend.CONFIGURED_IDPS
and renders the 'idp/available_idps.html' template with the list of configured IDPs.
"""
if mscolab_settings.USE_SAML2:
# set routes for SSO
@APP.route('/available_idps/', methods=['GET'])
def available_idps():
"""
This function checks if IDP (Identity Provider) is enabled in the mscolab_settings module.
If IDP is enabled, it retrieves the configured IDPs from setup_saml2_backend.CONFIGURED_IDPS
and renders the 'idp/available_idps.html' template with the list of configured IDPs.
"""
configured_idps = setup_saml2_backend.CONFIGURED_IDPS
return render_template('idp/available_idps.html', configured_idps=configured_idps), 200
return render_template('errors/403.html'), 403


@APP.route("/idp_login/", methods=['POST'])
def idp_login():
"""Handle the login process for the user by selected IDP"""
selected_idp = request.form.get('selectedIdentityProvider')
sp_config = None
for idp_config in setup_saml2_backend.CONFIGURED_IDPS:
if selected_idp == idp_config['idp_identity_name']:
sp_config = idp_config['idp_data']['saml2client']
break
@APP.route("/idp_login/", methods=['POST'])
def idp_login():
"""Handle the login process for the user by selected IDP"""
selected_idp = request.form.get('selectedIdentityProvider')
sp_config = None
for config in setup_saml2_backend.CONFIGURED_IDPS:
if selected_idp == config['idp_identity_name']:
sp_config = config['idp_data']['saml2client']
break

try:
_, response_binding = sp_config.config.getattr("endpoints", "sp")[
"assertion_consumer_service"
][0]
entity_id = get_idp_entity_id(selected_idp)
_, binding, http_args = sp_config.prepare_for_negotiated_authenticate(
entityid=entity_id,
response_binding=response_binding,
)
if binding == BINDING_HTTP_REDIRECT:
headers = dict(http_args["headers"])
return redirect(str(headers["Location"]), code=303)
return Response(http_args["data"], headers=http_args["headers"])
except (NameError, AttributeError):
return render_template('errors/403.html'), 403


@APP.route('/<path:url>', methods=['POST'])
def acs_post_handler(url):
"""
Function to handle unknown POST requests,
Implemented to Handle the SAML authentication response received via POST request from configured IDPs.
"""
try:
# implementation for handle configured saml assertion consumer endpoints
for idp_config in setup_saml2_backend.CONFIGURED_IDPS:
# Check if the requested URL exists in the assertion_consumer_endpoints dictionary
url_with_slash = '/' + url
url_exists_with_slash = url_with_slash in idp_config['idp_data']['assertion_consumer_endpoints']
url_exists_without_slash = url in idp_config['idp_data']['assertion_consumer_endpoints']
if url_exists_without_slash or url_exists_with_slash:
try:
_, response_binding = sp_config.config.getattr("endpoints", "sp")[
"assertion_consumer_service"
][0]
entity_id = get_idp_entity_id(selected_idp)
_, binding, http_args = sp_config.prepare_for_negotiated_authenticate(
entityid=entity_id,
response_binding=response_binding,
)
if binding == BINDING_HTTP_REDIRECT:
headers = dict(http_args["headers"])
return redirect(str(headers["Location"]), code=303)
return Response(http_args["data"], headers=http_args["headers"])
except (NameError, AttributeError):
return render_template('errors/403.html'), 403

def create_acs_post_handler(config):
"""
Create acs_post_handler function for the given idp_config.
"""
def acs_post_handler():
"""
Function to handle SAML authentication response.
"""
try:
outstanding_queries = {}
binding = BINDING_HTTP_POST
authn_response = idp_config['idp_data']['saml2client'].parse_authn_request_response(
authn_response = config['idp_data']['saml2client'].parse_authn_request_response(
request.form["SAMLResponse"], binding, outstanding=outstanding_queries
)
email = None
Expand All @@ -815,7 +793,6 @@ def acs_post_handler(url):
username = authn_response.ava["givenName"][0]
token = generate_confirmation_token(email)
except (NameError, AttributeError, KeyError):

try:
# Initialize an empty dictionary to store attribute values
attributes = {}
Expand All @@ -832,46 +809,62 @@ def acs_post_handler(url):
email = attributes["email"]
username = attributes["givenName"]
token = generate_confirmation_token(email)

except (NameError, AttributeError, KeyError):
render_template('errors/403.html'), 403
return render_template('errors/403.html'), 403

if email is not None and username is not None:
idp_user_db_state = create_or_update_idp_user(email, username, token,
idp_config['idp_identity_name'])
idp_user_db_state = create_or_update_idp_user(email,
username, token, idp_config['idp_identity_name'])
if idp_user_db_state:
return render_template('idp/idp_login_success.html', token=token), 200
else:
return render_template('errors/500.html'), 500
else:
return render_template('errors/500.html'), 500
except (NameError, AttributeError, KeyError):
return render_template('errors/403.html'), 403

return render_template('errors/500.html'), 500
except (NameError, AttributeError, KeyError):
return render_template('errors/403.html'), 403
return acs_post_handler

@APP.route('/idp_login_auth/', methods=['POST'])
def idp_login_auth():
"""Handle the SAML authentication validation of client application."""
try:
data = request.get_json()
token = data.get('token')
email = confirm_token(token, expiration=1200)
if email:
user = check_login(email, token)
if user:
random_token = secrets.token_hex(16)
user.hash_password(random_token)
db.session.add(user)
db.session.commit()
return json.dumps({
"success": True,
'token': random_token,
'user': {'username': user.username, 'id': user.id, 'emailid': user.emailid}
})
# Implementation for handling configured SAML assertion consumer endpoints
for idp_config in setup_saml2_backend.CONFIGURED_IDPS:
for assertion_consumer_endpoint in idp_config['idp_data']['assertion_consumer_endpoints']:
# Dynamically add the route for the current endpoint
APP.add_url_rule(f'/{assertion_consumer_endpoint}/', assertion_consumer_endpoint,
create_acs_post_handler(idp_config), methods=['POST'])

@APP.route('/idp_login_auth/', methods=['POST'])
def idp_login_auth():
"""Handle the SAML authentication validation of client application."""
try:
data = request.get_json()
token = data.get('token')
email = confirm_token(token, expiration=1200)
if email:
user = check_login(email, token)
if user:
random_token = secrets.token_hex(16)
user.hash_password(random_token)
db.session.add(user)
db.session.commit()
return json.dumps({
"success": True,
'token': random_token,
'user': {'username': user.username, 'id': user.id, 'emailid': user.emailid}
})
return jsonify({"success": False}), 401
return jsonify({"success": False}), 401
return jsonify({"success": False}), 401
except TypeError:
return jsonify({"success": False}), 401
except TypeError:
return jsonify({"success": False}), 401

@APP.route("/metadata/<idp_identity_name>", methods=['GET'])
def metadata(idp_identity_name):
"""Return the SAML metadata XML for the requested IDP"""
for config in setup_saml2_backend.CONFIGURED_IDPS:
if idp_identity_name == config['idp_identity_name']:
sp_config = config['idp_data']['saml2client']
metadata_string = create_metadata_string(
None, sp_config.config, 4, None, None, None, None, None
).decode("utf-8")
return Response(metadata_string, mimetype="text/xml")
return render_template('errors/404.html'), 404


def start_server(app, sockio, cm, fm, port=8083):
Expand Down

0 comments on commit a2e10e1

Please sign in to comment.