Skip to content

Commit

Permalink
Merge pull request #43 from rciam/develop
Browse files Browse the repository at this point in the history
Improve SAML metadata & login health checks
  • Loading branch information
NicolasLiampotis committed Aug 7, 2024
2 parents d1f32fc + e6aa4f2 commit 8ea2e42
Show file tree
Hide file tree
Showing 4 changed files with 52 additions and 25 deletions.
5 changes: 4 additions & 1 deletion rciam_probes.spec
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,7 @@
Name: rciam_probes
Summary: RCIAM related probes - Complete
Group: grnet/rciam
Version: 2.1.1
Version: 2.2.0
Release: %(echo $GIT_COMMIT_DATE).%(echo $GIT_COMMIT_HASH)%{?dist}
Url: https://github.com/rciam/%{name}
License: Apache-2.0
Expand Down Expand Up @@ -135,6 +135,9 @@ rm -rf $RPM_BUILD_ROOT
#fi

%changelog
* Wed Aug 7 2024 Nicolas Liampotis <[email protected]> 2.2.0
- Added support for varying XML namespace prefixes in SAML metadata health checks
- Added support for skipping the IdP Discovery page during login checks
* Mon Aug 5 2024 Nicolas Liampotis <[email protected]> 2.1.1
- Fixed Python requirements. See https://github.com/seleniumbase/SeleniumBase/issues/2782
- Updated geckodriver to v0.34.0
Expand Down
7 changes: 7 additions & 0 deletions rciam_probes/probes/checkhealth/checkhealth.py
Original file line number Diff line number Diff line change
Expand Up @@ -123,6 +123,10 @@ def __sp_redirect_disco_n_click(self):
self.__browser.get(self.__args.sp)
evaluate_response_status(self.__browser, self.__args, self.__logger)

if self.__args.skip_idp_discovery:
self.__logger.debug('Skipping IdP discovery page')
return

# In case i have a list of hops
idp_list = self.__args.identity.split(',')
try:
Expand Down Expand Up @@ -469,6 +473,9 @@ def parse_arguments(args):
help='Domain, protocol assumed to be https, e.g. example.com')
parser.add_argument('--logowner', '-o', dest="logowner", default=ParamDefaults.LOG_OWNER.value,
help='Owner of the log file rciam_probes.log under /var/log/rciam_probes/. Default owner is nagios user.')
parser.add_argument('--skip-idp-discovery', dest="skip_idp_discovery",
help='Skip IdP discovery if this flag is present',
action='store_true')
parser.add_argument('--version', '-V', version='%(prog)s 1.2.14', action='version')
return parser.parse_args(args)

Expand Down
63 changes: 40 additions & 23 deletions rciam_probes/shared/utils.py
Original file line number Diff line number Diff line change
Expand Up @@ -164,36 +164,53 @@ def fetch_cert_from_type(metadata_dict, cert_type):
:return: dictionary of certificates {type<str>: certificate<str>}
:rtype: dict
:todo use enumarators instead of fixed values for all,encryption,signing
TODO:
- Use enumarators instead of fixed values for all,encryption,signing
- Improve handling of Keycloak metadata
"""
try:
def find_elements_by_local_name(data, local_name):
"""Finds elements with the given local name in a dictionary-like structure."""
elements = []
for key, value in data.items():
if key.split(':')[-1] == local_name:
elements.append(value)
elif isinstance(value, dict):
elements.extend(find_elements_by_local_name(value, local_name))
elif isinstance(value, list):
for item in value:
if isinstance(item, dict):
elements.extend(find_elements_by_local_name(item, local_name))
return elements

x509_list_gen = gen_dict_extract(metadata_dict, 'KeyDescriptor')
x509_list = next(x509_list_gen)
x509_dict = {}
# If all is chosen then return a list with all the certificates

for x509_elem_obj in x509_list:
if isinstance(x509_elem_obj, dict):
mcert_type = ['unknown', x509_elem_obj.get('@use')]['@use' in x509_elem_obj]
key_infos = find_elements_by_local_name(x509_elem_obj, 'KeyInfo')
for key_info in key_infos:
x509_data_elems = find_elements_by_local_name(key_info, 'X509Data')
for x509_data in x509_data_elems:
x509_cert_elems = find_elements_by_local_name(x509_data, 'X509Certificate')
for x509_cert in x509_cert_elems:
x509_dict[mcert_type] = x509_cert
else:
x509_dict['unknown'] = x509_list.get('ds:KeyInfo').get('ds:X509Data').get('ds:X509Certificate')

# If 'all' certificate types are requested then return a list with all the certificates
if cert_type == 'all':
for x509_elem_obj in x509_list:
# if there is no certificate type then x509_elem_obj will not actually be a dictionary
if isinstance(x509_elem_obj, dict):
mcert_type = ['unknown', x509_elem_obj.get('@use')]['@use' in x509_elem_obj]
x509_dict[mcert_type] = x509_elem_obj.get('ds:KeyInfo').get('ds:X509Data').get(
'ds:X509Certificate')
else:
x509_dict['unknown'] = x509_list.get('ds:KeyInfo').get('ds:X509Data').get('ds:X509Certificate')
return x509_dict
else: # If not then return the certificate of the type requested
for x509_elem_obj in x509_list:
# if there is no certificate type then x509_elem_obj will not actually be a dictionary
if isinstance(x509_elem_obj, dict):
if x509_elem_obj.get('@use') != cert_type:
continue
x509_dict[x509_elem_obj.get('@use')] = x509_elem_obj.get('ds:KeyInfo').get('ds:X509Data').get(
'ds:X509Certificate')
else:
x509_dict['unknown'] = x509_list.get('ds:KeyInfo').get('ds:X509Data').get('ds:X509Certificate')
return x509_dict
# If no Certificate available raise an exception
raise Exception("No X509 certificate of type:%s found" % cert_type)

# If a specific certificate type is requested then return the specific certificate or 'unknown'
valid_types = {cert_type, 'unknown'}
if not valid_types & set(x509_dict.keys()):
raise Exception("No X509 certificate of type:%s found" % cert_type)
return {t: cert for t, cert in x509_dict.items() if t == 'unknown' or t == cert_type}

except Exception as e:
# Log the title of the view
raise Exception(e.args[0]) from e
Expand Down
2 changes: 1 addition & 1 deletion setup.py
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,7 @@
from os import path

__name__ = 'rciam_probes'
__version__ = '2.1.1'
__version__ = '2.2.0'

here = path.abspath(path.dirname(__file__))

Expand Down

0 comments on commit 8ea2e42

Please sign in to comment.