diff --git a/.gitignore b/.gitignore index 1e37f87..6989ea0 100644 --- a/.gitignore +++ b/.gitignore @@ -13,3 +13,4 @@ docs/_build *.sqlite3 .coverage *.egg-info +.venv diff --git a/README.rst b/README.rst index 5b5e334..8c662bb 100644 --- a/README.rst +++ b/README.rst @@ -41,6 +41,19 @@ Available settings import ssl LDAP_AUTH_TLS_VERSION = ssl.PROTOCOL_TLSv1_2 + # Specifies if the server certificate must be validated, values can be: CERT_NONE (certificates are ignored), + # CERT_OPTIONAL (not required, but validated if provided) and CERT_REQUIRED (required and validated). The default + # is not secure and it is recommended that CERT_REQUIRED is used instead. + LDAP_AUTH_TLS_VALIDATE_CERT = None + + # LDAP_AUTH_TLS_CA_CERTS_FILE a file containing the certificates of trusted certification authorities. + LDAP_AUTH_TLS_CA_CERTS_FILE = None + + # String in the OpenSSL cipher list format specifying which ciphers must be used. See https://ldap3.readthedocs.io/en/latest/ssltls.html . It + # works on recent Python interpreters that allow to change the cipher in the SSLContext or in the the wrap_socket() + #method, it’s ignored on older versions. + LDAP_AUTH_TLS_CIPHERS = None + # The LDAP search base for looking up users. LDAP_AUTH_SEARCH_BASE = "ou=people,dc=example,dc=com" @@ -81,6 +94,9 @@ Available settings # Use this to support different types of LDAP server. LDAP_AUTH_FORMAT_USERNAME = "django_python3_ldap.utils.format_username_openldap" + # A single attribute or a list of attributes to be returned by LDAP operations. + LDAP_AUTH_ATTRIBUTES = ldap3.ALL_ATTRIBUTES + # Sets the login domain for Active Directory users. LDAP_AUTH_ACTIVE_DIRECTORY_DOMAIN = None diff --git a/django_python3_ldap/conf.py b/django_python3_ldap/conf.py index f598759..5530cbe 100644 --- a/django_python3_ldap/conf.py +++ b/django_python3_ldap/conf.py @@ -1,8 +1,7 @@ """ Settings used by django-python3. """ -from ssl import PROTOCOL_TLS - +import ldap3 from django.conf import settings @@ -46,7 +45,22 @@ def __init__(self, settings): LDAP_AUTH_TLS_VERSION = LazySetting( name="LDAP_AUTH_TLS_VERSION", - default=PROTOCOL_TLS, + default=None, + ) + + LDAP_AUTH_TLS_VALIDATE_CERT = LazySetting( + name="LDAP_AUTH_TLS_VALIDATE_CERT", + default=None, + ) + + LDAP_AUTH_TLS_CA_CERTS_FILE = LazySetting( + name="LDAP_AUTH_TLS_CA_CERTS_FILE", + default=None, + ) + + LDAP_AUTH_TLS_CIPHERS = LazySetting( + name="LDAP_AUTH_TLS_CIPHERS", + default=None, ) LDAP_AUTH_SEARCH_BASE = LazySetting( @@ -96,6 +110,11 @@ def __init__(self, settings): default="django_python3_ldap.utils.format_username_openldap", ) + LDAP_AUTH_ATTRIBUTES = LazySetting( + name="LDAP_AUTH_ATTRIBUTES", + default=ldap3.ALL_ATTRIBUTES, + ) + LDAP_AUTH_ACTIVE_DIRECTORY_DOMAIN = LazySetting( name="LDAP_AUTH_ACTIVE_DIRECTORY_DOMAIN", default=None, diff --git a/django_python3_ldap/ldap.py b/django_python3_ldap/ldap.py index 922e0cd..a8e95b9 100644 --- a/django_python3_ldap/ldap.py +++ b/django_python3_ldap/ldap.py @@ -48,7 +48,7 @@ def _get_or_create_user(self, user_data): user_fields = { field_name: ( attributes[attribute_name][0] - if isinstance(attributes[attribute_name], (list, tuple)) else + if isinstance(attributes[attribute_name], (list, tuple)) and len(attributes[attribute_name]) > 0 else attributes[attribute_name] ) for field_name, attribute_name @@ -97,7 +97,7 @@ def iter_users(self): search_base=settings.LDAP_AUTH_SEARCH_BASE, search_filter=format_search_filter({}), search_scope=ldap3.SUBTREE, - attributes=ldap3.ALL_ATTRIBUTES, + attributes=settings.LDAP_AUTH_ATTRIBUTES, get_operational_attributes=True, paged_size=30, ) @@ -133,13 +133,34 @@ def has_user(self, **kwargs): search_base=settings.LDAP_AUTH_SEARCH_BASE, search_filter=format_search_filter(kwargs), search_scope=ldap3.SUBTREE, - attributes=ldap3.ALL_ATTRIBUTES, + attributes=settings.LDAP_AUTH_ATTRIBUTES, get_operational_attributes=True, size_limit=1, ) return bool(len(self._connection.response) > 0 and self._connection.response[0].get("attributes")) +def get_tls_options(settings): + tls_options = {} + + if not settings.LDAP_AUTH_USE_TLS: + return None + + if settings.LDAP_AUTH_TLS_VALIDATE_CERT: + tls_options['validate'] = settings.LDAP_AUTH_TLS_VALIDATE_CERT + + if settings.LDAP_AUTH_TLS_CA_CERTS_FILE: + tls_options['ca_certs_file'] = settings.LDAP_AUTH_TLS_CA_CERTS_FILE + + if settings.LDAP_AUTH_TLS_VERSION: + tls_options['version'] = settings.LDAP_AUTH_TLS_VERSION + + if settings.LDAP_AUTH_TLS_CIPHERS: + tls_options['ciphers'] = settings.LDAP_AUTH_TLS_CIPHERS + + return (ldap3.Tls(**tls_options)) + + @contextmanager def connection(**kwargs): """ @@ -167,25 +188,19 @@ def connection(**kwargs): if not isinstance(auth_url, list): auth_url = [auth_url] for u in auth_url: - # Include SSL / TLS, if requested. - server_args = { - "allowed_referral_hosts": [("*", True)], - "get_info": ldap3.NONE, - "connect_timeout": settings.LDAP_AUTH_CONNECT_TIMEOUT, - } - if settings.LDAP_AUTH_USE_TLS: - server_args["tls"] = ldap3.Tls( - ciphers="ALL", - version=settings.LDAP_AUTH_TLS_VERSION, - ) server_pool.add( ldap3.Server( u, - **server_args, + allowed_referral_hosts=[("*", True)], + get_info=ldap3.NONE, + connect_timeout=settings.LDAP_AUTH_CONNECT_TIMEOUT, + tls=get_tls_options(settings), + use_ssl=settings.LDAP_AUTH_USE_TLS ) ) # Connect. try: + # Include SSL / TLS, if requested. connection_args = { "user": username, "password": password, diff --git a/django_python3_ldap/tests.py b/django_python3_ldap/tests.py index dc7cbe6..57dbe56 100644 --- a/django_python3_ldap/tests.py +++ b/django_python3_ldap/tests.py @@ -1,7 +1,8 @@ # encoding=utf-8 from __future__ import unicode_literals -from unittest import skipUnless, skip +import ssl +from unittest import skipUnless from io import StringIO from django.test import TestCase, override_settings @@ -116,16 +117,6 @@ def testRepeatedUserAuthenticationDoestRecreateUsers(self): # Ensure that the user isn't recreated on second access. self.assertEqual(user_1.pk, user_2.pk) - @skip("FIXME: test server currently uses outdated TLS cyphers") - def testAuthenticateWithTLS(self): - with self.settings(LDAP_AUTH_USE_TLS=True): - user = authenticate( - username=settings.LDAP_AUTH_TEST_USER_USERNAME, - password=settings.LDAP_AUTH_TEST_USER_PASSWORD, - ) - self.assertIsInstance(user, User) - self.assertEqual(user.username, settings.LDAP_AUTH_TEST_USER_USERNAME) - def testAuthenticateWithRebind(self): with self.settings( LDAP_AUTH_CONNECTION_USERNAME=settings.LDAP_AUTH_TEST_USER_USERNAME, @@ -412,3 +403,62 @@ def testReCleanUsersDoesntRecreateUsers(self): call_command("ldap_clean_users", verbosity=0, purge=True) user_count_2 = User.objects.count() self.assertEqual(user_count_1, user_count_2) + + +@skipUnless(settings.LDAP_AUTH_TEST_USER_USERNAME, "No settings.LDAP_AUTH_TEST_USER_USERNAME supplied.") +@skipUnless(settings.LDAP_AUTH_TEST_USER_PASSWORD, "No settings.LDAP_AUTH_TEST_USER_PASSWORD supplied.") +@skipUnless(settings.LDAP_AUTH_USER_LOOKUP_FIELDS == ("username",), "Cannot test using custom lookup fields.") +@skipUnless(django_settings.AUTH_USER_MODEL == "auth.User", "Cannot test using a custom user model.") +class TestAttrib(TestCase): + + def setUp(self): + super(TestAttrib, self).setUp() + User.objects.all().delete() + + def testSyncUsersCreatesUsers(self): + with self.settings( + LDAP_AUTH_ATTRIBUTES=['givenName', "homeDirectory", "uid"], + LDAP_AUTH_USER_FIELDS={ + "username": "uid", + "first_name": "givenName", + "last_name": "homeDirectory", + "email": "mail", + } + + ): + user = authenticate( + username=settings.LDAP_AUTH_TEST_USER_USERNAME, + password=settings.LDAP_AUTH_TEST_USER_PASSWORD, + ) + self.assertIsInstance(user, User) + self.assertEqual(user.last_name, 'home') + self.assertEqual(user.email, '') + + +@skipUnless(settings.LDAP_AUTH_TEST_USER_USERNAME, "No settings.LDAP_AUTH_TEST_USER_USERNAME supplied.") +@skipUnless(settings.LDAP_AUTH_TEST_USER_PASSWORD, "No settings.LDAP_AUTH_TEST_USER_PASSWORD supplied.") +@skipUnless(settings.LDAP_AUTH_USER_LOOKUP_FIELDS == ("username",), "Cannot test using custom lookup fields.") +@skipUnless(django_settings.AUTH_USER_MODEL == "auth.User", "Cannot test using a custom user model.") +class TestSSL(TestCase): + + def setUp(self): + super(TestSSL, self).setUp() + User.objects.all().delete() + + def testAuthenticateWithTLS(self): + with self.settings(LDAP_AUTH_USE_TLS=True, LDAP_AUTH_TLS_VALIDATE_CERT=ssl.CERT_NONE): + user = authenticate( + username=settings.LDAP_AUTH_TEST_USER_USERNAME, + password=settings.LDAP_AUTH_TEST_USER_PASSWORD, + ) + self.assertIsInstance(user, User) + self.assertEqual(user.username, settings.LDAP_AUTH_TEST_USER_USERNAME) + + # This should fail as server is presenting a self-signed certificate. + def test_validate_required_self_signed(self): + with self.settings(LDAP_AUTH_USE_TLS=True, LDAP_AUTH_TLS_VALIDATE_CERT=ssl.CERT_REQUIRED): + with self.assertRaises(Exception): + authenticate( + username='any', + password='any', + )