Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

improve accessibility saml2 urls #2068

Merged
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
201 changes: 97 additions & 104 deletions mslib/mscolab/server.py
Original file line number Diff line number Diff line change
Expand Up @@ -68,11 +68,6 @@ class mscolab_auth:
("add_new_user_here", "add_md5_digest_of_PASSWORD_here")]
__file__ = None

# setup idp login config
if mscolab_settings.USE_SAML2:
setup_saml2_backend()


# setup http auth
if mscolab_settings.__dict__.get('enable_basic_http_authentication', False):
logging.debug("Enabling basic HTTP authentication. Username and "
Expand Down Expand Up @@ -204,9 +199,9 @@ def get_idp_entity_id(selected_idp):
Finds the entity_id from the configured IDPs
:return: the entity_id of the idp or None
"""
for idp_config in setup_saml2_backend.CONFIGURED_IDPS:
if selected_idp == idp_config['idp_identity_name']:
idps = idp_config['idp_data']['saml2client'].metadata.identity_providers()
for config in setup_saml2_backend.CONFIGURED_IDPS:
if selected_idp == config['idp_identity_name']:
idps = config['idp_data']['saml2client'].metadata.identity_providers()
only_idp = idps[0]
entity_id = only_idp
return entity_id
Expand Down Expand Up @@ -735,76 +730,59 @@ def reset_request():
return render_template('errors/403.html'), 403


@APP.route("/metadata/<idp_identity_name>", methods=['GET'])
def metadata(idp_identity_name):
"""Return the SAML metadata XML for the requested IDP"""
for idp_config in setup_saml2_backend.CONFIGURED_IDPS:
if idp_identity_name == idp_config['idp_identity_name']:
sp_config = idp_config['idp_data']['saml2client']
metadata_string = create_metadata_string(
None, sp_config.config, 4, None, None, None, None, None
).decode("utf-8")
return Response(metadata_string, mimetype="text/xml")
return render_template('errors/404.html'), 404

if mscolab_settings.USE_SAML2:
# setup idp login config
setup_saml2_backend()

@APP.route('/available_idps/', methods=['GET'])
def available_idps():
"""
This function checks if IDP (Identity Provider) is enabled in the mscolab_settings module.
If IDP is enabled, it retrieves the configured IDPs from setup_saml2_backend.CONFIGURED_IDPS
and renders the 'idp/available_idps.html' template with the list of configured IDPs.
"""
if mscolab_settings.USE_SAML2:
# set routes for SSO
@APP.route('/available_idps/', methods=['GET'])
def available_idps():
"""
This function checks if IDP (Identity Provider) is enabled in the mscolab_settings module.
If IDP is enabled, it retrieves the configured IDPs from setup_saml2_backend.CONFIGURED_IDPS
and renders the 'idp/available_idps.html' template with the list of configured IDPs.
"""
configured_idps = setup_saml2_backend.CONFIGURED_IDPS
return render_template('idp/available_idps.html', configured_idps=configured_idps), 200
return render_template('errors/403.html'), 403


@APP.route("/idp_login/", methods=['POST'])
def idp_login():
"""Handle the login process for the user by selected IDP"""
selected_idp = request.form.get('selectedIdentityProvider')
sp_config = None
for idp_config in setup_saml2_backend.CONFIGURED_IDPS:
if selected_idp == idp_config['idp_identity_name']:
sp_config = idp_config['idp_data']['saml2client']
break
@APP.route("/idp_login/", methods=['POST'])
def idp_login():
"""Handle the login process for the user by selected IDP"""
selected_idp = request.form.get('selectedIdentityProvider')
sp_config = None
for config in setup_saml2_backend.CONFIGURED_IDPS:
if selected_idp == config['idp_identity_name']:
sp_config = config['idp_data']['saml2client']
break

try:
_, response_binding = sp_config.config.getattr("endpoints", "sp")[
"assertion_consumer_service"
][0]
entity_id = get_idp_entity_id(selected_idp)
_, binding, http_args = sp_config.prepare_for_negotiated_authenticate(
entityid=entity_id,
response_binding=response_binding,
)
if binding == BINDING_HTTP_REDIRECT:
headers = dict(http_args["headers"])
return redirect(str(headers["Location"]), code=303)
return Response(http_args["data"], headers=http_args["headers"])
except (NameError, AttributeError):
return render_template('errors/403.html'), 403


@APP.route('/<path:url>', methods=['POST'])
def acs_post_handler(url):
"""
Function to handle unknown POST requests,
Implemented to Handle the SAML authentication response received via POST request from configured IDPs.
"""
try:
# implementation for handle configured saml assertion consumer endpoints
for idp_config in setup_saml2_backend.CONFIGURED_IDPS:
# Check if the requested URL exists in the assertion_consumer_endpoints dictionary
url_with_slash = '/' + url
url_exists_with_slash = url_with_slash in idp_config['idp_data']['assertion_consumer_endpoints']
url_exists_without_slash = url in idp_config['idp_data']['assertion_consumer_endpoints']
if url_exists_without_slash or url_exists_with_slash:
try:
_, response_binding = sp_config.config.getattr("endpoints", "sp")[
"assertion_consumer_service"
][0]
entity_id = get_idp_entity_id(selected_idp)
_, binding, http_args = sp_config.prepare_for_negotiated_authenticate(
entityid=entity_id,
response_binding=response_binding,
)
if binding == BINDING_HTTP_REDIRECT:
headers = dict(http_args["headers"])
return redirect(str(headers["Location"]), code=303)
return Response(http_args["data"], headers=http_args["headers"])
except (NameError, AttributeError):
return render_template('errors/403.html'), 403

def create_acs_post_handler(config):
"""
Create acs_post_handler function for the given idp_config.
"""
def acs_post_handler():
"""
Function to handle SAML authentication response.
"""
try:
outstanding_queries = {}
binding = BINDING_HTTP_POST
authn_response = idp_config['idp_data']['saml2client'].parse_authn_request_response(
authn_response = config['idp_data']['saml2client'].parse_authn_request_response(
request.form["SAMLResponse"], binding, outstanding=outstanding_queries
)
email = None
Expand All @@ -815,7 +793,6 @@ def acs_post_handler(url):
username = authn_response.ava["givenName"][0]
token = generate_confirmation_token(email)
except (NameError, AttributeError, KeyError):

try:
# Initialize an empty dictionary to store attribute values
attributes = {}
Expand All @@ -832,46 +809,62 @@ def acs_post_handler(url):
email = attributes["email"]
username = attributes["givenName"]
token = generate_confirmation_token(email)

except (NameError, AttributeError, KeyError):
render_template('errors/403.html'), 403
return render_template('errors/403.html'), 403

if email is not None and username is not None:
idp_user_db_state = create_or_update_idp_user(email, username, token,
idp_config['idp_identity_name'])
idp_user_db_state = create_or_update_idp_user(email,
username, token, idp_config['idp_identity_name'])
if idp_user_db_state:
return render_template('idp/idp_login_success.html', token=token), 200
else:
return render_template('errors/500.html'), 500
else:
return render_template('errors/500.html'), 500
except (NameError, AttributeError, KeyError):
return render_template('errors/403.html'), 403

return render_template('errors/500.html'), 500
except (NameError, AttributeError, KeyError):
return render_template('errors/403.html'), 403
return acs_post_handler

@APP.route('/idp_login_auth/', methods=['POST'])
def idp_login_auth():
"""Handle the SAML authentication validation of client application."""
try:
data = request.get_json()
token = data.get('token')
email = confirm_token(token, expiration=1200)
if email:
user = check_login(email, token)
if user:
random_token = secrets.token_hex(16)
user.hash_password(random_token)
db.session.add(user)
db.session.commit()
return json.dumps({
"success": True,
'token': random_token,
'user': {'username': user.username, 'id': user.id, 'emailid': user.emailid}
})
# Implementation for handling configured SAML assertion consumer endpoints
for idp_config in setup_saml2_backend.CONFIGURED_IDPS:
for assertion_consumer_endpoint in idp_config['idp_data']['assertion_consumer_endpoints']:
# Dynamically add the route for the current endpoint
APP.add_url_rule(f'/{assertion_consumer_endpoint}/', assertion_consumer_endpoint,
create_acs_post_handler(idp_config), methods=['POST'])

@APP.route('/idp_login_auth/', methods=['POST'])
def idp_login_auth():
"""Handle the SAML authentication validation of client application."""
try:
data = request.get_json()
token = data.get('token')
email = confirm_token(token, expiration=1200)
if email:
user = check_login(email, token)
if user:
random_token = secrets.token_hex(16)
user.hash_password(random_token)
db.session.add(user)
db.session.commit()
return json.dumps({
"success": True,
'token': random_token,
'user': {'username': user.username, 'id': user.id, 'emailid': user.emailid}
})
return jsonify({"success": False}), 401
return jsonify({"success": False}), 401
return jsonify({"success": False}), 401
except TypeError:
return jsonify({"success": False}), 401
except TypeError:
return jsonify({"success": False}), 401

@APP.route("/metadata/<idp_identity_name>", methods=['GET'])
def metadata(idp_identity_name):
"""Return the SAML metadata XML for the requested IDP"""
for config in setup_saml2_backend.CONFIGURED_IDPS:
if idp_identity_name == config['idp_identity_name']:
sp_config = config['idp_data']['saml2client']
metadata_string = create_metadata_string(
None, sp_config.config, 4, None, None, None, None, None
).decode("utf-8")
return Response(metadata_string, mimetype="text/xml")
return render_template('errors/404.html'), 404


def start_server(app, sockio, cm, fm, port=8083):
Expand Down