From f72177ce6b2687836afeccf44a1c684fbcc6a3e5 Mon Sep 17 00:00:00 2001 From: "Ja (Thanakul) Wattanawong" Date: Sat, 20 Apr 2019 17:17:47 -0700 Subject: [PATCH 1/2] separate user_attrs into privileged so other methods can run --- ocflib/account/creation.py | 2 +- ocflib/account/search.py | 5 +++++ ocflib/vhost/web.py | 8 ++++---- tests/account/creation_test.py | 4 ++-- 4 files changed, 12 insertions(+), 7 deletions(-) diff --git a/ocflib/account/creation.py b/ocflib/account/creation.py index 590f3cff..3c1e21a9 100644 --- a/ocflib/account/creation.py +++ b/ocflib/account/creation.py @@ -274,7 +274,7 @@ def validate_calnet_uid(uid): raise ValidationError( 'CalNet UID already has account: ' + str(existing_accounts)) - attrs = search.user_attrs_ucb(uid) + attrs = search.user_attrs_ucb_privileged(uid) if not attrs: raise ValidationError("CalNet UID can't be found in university LDAP.") diff --git a/ocflib/account/search.py b/ocflib/account/search.py index 50161d9e..4ecbb891 100644 --- a/ocflib/account/search.py +++ b/ocflib/account/search.py @@ -56,6 +56,11 @@ def user_attrs_ucb(uid): base=UCB_LDAP_PEOPLE) +def user_attrs_ucb_privileged(uid): + return user_attrs(uid, connection=ldap.ldap_ucb_privileged, + base=UCB_LDAP_PEOPLE) + + def user_exists(account): """Returns whether username is an OCF account.""" return bool(user_attrs(account)) diff --git a/ocflib/vhost/web.py b/ocflib/vhost/web.py index e294b81e..f145417d 100644 --- a/ocflib/vhost/web.py +++ b/ocflib/vhost/web.py @@ -3,7 +3,7 @@ import requests from ocflib.account.search import user_attrs -from ocflib.account.search import user_attrs_ucb +from ocflib.account.search import user_attrs_ucb_privileged VHOST_DB_PATH = '/home/s/st/staff/vhost/vhost.conf' VHOST_DB_URL = 'https://www.ocf.berkeley.edu/~staff/vhost.conf' @@ -86,9 +86,9 @@ def eligible_for_vhost(user): if 'callinkOid' in attrs: return True elif 'calnetUid' in attrs: - attrs_ucb = user_attrs_ucb(attrs['calnetUid']) - # TODO: Uncomment when we get a privileged LDAP bind. - if attrs_ucb: # and 'EMPLOYEE-TYPE-ACADEMIC' in attrs_ucb['berkeleyEduAffiliations']: + attrs_ucb = user_attrs_ucb_privileged(attrs['calnetUid']) + + if attrs_ucb and 'EMPLOYEE-TYPE-ACADEMIC' in attrs_ucb['berkeleyEduAffiliations']: return True return False diff --git a/tests/account/creation_test.py b/tests/account/creation_test.py index dc35324c..18d6db1a 100644 --- a/tests/account/creation_test.py +++ b/tests/account/creation_test.py @@ -444,7 +444,7 @@ def fake_credentials(mock_rsa_key): @pytest.yield_fixture def mock_valid_calnet_uid(): with mock.patch( - 'ocflib.account.search.user_attrs_ucb', + 'ocflib.account.search.user_attrs_ucb_privileged', return_value={'berkeleyEduAffiliations': ['STUDENT-TYPE-REGISTERED']} ): yield @@ -453,7 +453,7 @@ def mock_valid_calnet_uid(): @pytest.yield_fixture def mock_invalid_calnet_uid(): with mock.patch( - 'ocflib.account.search.user_attrs_ucb', + 'ocflib.account.search.user_attrs_ucb_privileged', return_value={'berkeleyEduAffiliations': ['STUDENT-STATUS-EXPIRED']}, ): yield From e93fa9b7d0864880bacd7e5846a7a27c5db8a811 Mon Sep 17 00:00:00 2001 From: Christopher Cooper Date: Mon, 22 Apr 2019 11:39:31 -0700 Subject: [PATCH 2/2] Revert "Revert "Merge pull request #157 from ja5087/privileged_ldap"" This reverts commit bec763573a74fad348e6c43254009bb79a1b472c. --- ocflib/account/creation.py | 9 +++---- ocflib/account/search.py | 6 ++--- ocflib/infra/ldap.py | 47 +++++++++++++++++++++++++------- tests-manual/account/search | 12 +++++++++ tests/account/creation_test.py | 35 +++++++++++++++++++----- tests/account/search_test.py | 13 --------- tests/ucb/directory_test.py | 7 ----- tests/vhost/web_test.py | 49 +++++++++++++++++++++++++++++----- 8 files changed, 127 insertions(+), 51 deletions(-) diff --git a/ocflib/account/creation.py b/ocflib/account/creation.py index 3c1e21a9..600a79bb 100644 --- a/ocflib/account/creation.py +++ b/ocflib/account/creation.py @@ -279,12 +279,11 @@ def validate_calnet_uid(uid): if not attrs: raise ValidationError("CalNet UID can't be found in university LDAP.") - # TODO: Uncomment when we get privileged LDAP bind. # check if user is eligible for an account - # affiliations = attrs['berkeleyEduAffiliations'] - # if not eligible_for_account(affiliations): - # raise ValidationWarning( - # 'Affiliate type not eligible for account: ' + str(affiliations)) + affiliations = attrs['berkeleyEduAffiliations'] + if not eligible_for_account(affiliations): + raise ValidationWarning( + 'Affiliate type not eligible for account: ' + str(affiliations)) def eligible_for_account(affiliations): diff --git a/ocflib/account/search.py b/ocflib/account/search.py index 4ecbb891..6fd4a01c 100644 --- a/ocflib/account/search.py +++ b/ocflib/account/search.py @@ -32,7 +32,7 @@ def users_by_callink_oid(callink_oid): return users_by_filter('(callinkOid={})'.format(callink_oid)) -def user_attrs(uid, connection=ldap.ldap_ocf, base=OCF_LDAP_PEOPLE): +def user_attrs(uid, connection=ldap.ldap_ocf, base=OCF_LDAP_PEOPLE, dn=None, password=None): """Returns a dictionary of LDAP attributes for a given LDAP UID. The returned dictionary looks like: @@ -44,7 +44,7 @@ def user_attrs(uid, connection=ldap.ldap_ocf, base=OCF_LDAP_PEOPLE): Returns None if no account exists with uid=user_account. """ - with connection() as c: + with connection(dn, password) as c: c.search(base, '(uid={})'.format(uid), attributes=ldap3.ALL_ATTRIBUTES) if len(c.response) > 0: @@ -52,7 +52,7 @@ def user_attrs(uid, connection=ldap.ldap_ocf, base=OCF_LDAP_PEOPLE): def user_attrs_ucb(uid): - return user_attrs(uid, connection=ldap.ldap_ucb, + return user_attrs(uid, connection=ldap.ldap_ucb_privileged, base=UCB_LDAP_PEOPLE) diff --git a/ocflib/infra/ldap.py b/ocflib/infra/ldap.py index b96c2bbe..3d1d10f1 100644 --- a/ocflib/infra/ldap.py +++ b/ocflib/infra/ldap.py @@ -21,11 +21,13 @@ UCB_LDAP = 'ldap.berkeley.edu' UCB_LDAP_URL = 'ldaps://' + UCB_LDAP UCB_LDAP_PEOPLE = 'ou=People,dc=Berkeley,dc=EDU' +UCB_LDAP_DN = 'uid=ocf,ou=applications,dc=berkeley,dc=edu' +UCB_LDAP_PASSWORD_PATH = '/etc/ucbldap.passwd' @contextmanager -def ldap_connection(host): - """Context manager that provides an ldap3 Connection. +def ldap_connection(host, dn=None, password=None): + """Context manager that provides an ldap3 Connection. Also supports optional credentials for a privileged bind. Example usage: @@ -37,31 +39,47 @@ def ldap_connection(host): :param host: server hostname """ + server = ldap3.Server(host, use_ssl=True) - with ldap3.Connection(server) as connection: + with ldap3.Connection(server, dn, password) as connection: yield connection -def ldap_ocf(): - """Context manager that provides an ldap3 Connection to OCF's LDAP server. +def ldap_ocf(dn=None, password=None): + """Context manager that provides an ldap3 Connection to OCF's LDAP server. Accepts optional DN and password. Example usage: with ldap_ocf() as c: c.search(OCF_LDAP_PEOPLE, '(uid=ckuehl)', attributes=['uidNumber']) """ - return ldap_connection(OCF_LDAP) + return ldap_connection(OCF_LDAP, dn, password) -def ldap_ucb(): - """Context manager that provides an ldap3 Connection to the campus LDAP. +def ldap_ucb(dn=None, password=None): + """Context manager that provides an ldap3 Connection to the campus LDAP. Accepts optional DN and password. Example usage: with ldap_ucb() as c: c.search(UCB_LDAP_PEOPLE, '(uid=ckuehl)', attributes=['uidNumber']) """ - return ldap_connection(UCB_LDAP) + return ldap_connection(UCB_LDAP, dn, password) + + +def ldap_ucb_privileged(dn=None, password=None): + """Context manager that provides a privileged ldap3 Connection to the campus LDAP. + + Note that this method will ignore all dn and password arguments, + which are being kept for compatibility with user_attrs(). + + Example usage: + + with ldap_ucb_privileged() as c: + c.search(UCB_LDAP_PEOPLE, '(uid=ckuehl)', attributes=['uidNumber']) + """ + password = _read_ucb_password() + return ldap_ucb(UCB_LDAP_DN, password) def _format_attr(key, values): @@ -226,3 +244,14 @@ def format_timestamp(timestamp): if timestamp.tzinfo is None or timestamp.tzinfo.utcoffset(timestamp) is None: raise ValueError('Timestamp has no timezone info') return timestamp.strftime('%Y%m%d%H%M%S%z') + + +def _read_ucb_password(): + """Returns a string of the current campus LDAP privileged bind password + found in UCB_LDAP_PASSWORD_PATH + + :return: A string of the campus LDAP bind password + """ + + with open(UCB_LDAP_PASSWORD_PATH, 'r') as passwordFile: + return passwordFile.read() diff --git a/tests-manual/account/search b/tests-manual/account/search index eb2f0a0f..94d12071 100755 --- a/tests-manual/account/search +++ b/tests-manual/account/search @@ -1,5 +1,6 @@ #!/usr/bin/env python3 import ocflib.account.search as search +import ocflib.ucb.directory as directory if __name__ == '__main__': print('Users with CalNet UID 1034192: {}'.format( @@ -19,3 +20,14 @@ if __name__ == '__main__': for user in ('ckuehl', 'ggroup'): print('User {} is group: {}'.format(user, search.user_is_group(user))) + + """ Note: the following tests will fail if the password file in + UCB_LDAP_PASSWORD_PATH does not exist or is invalid + """ + user = search.user_attrs_ucb(1101587) # jvperrin's uid + print('User with uid {} has uid {}'.format(user['uid'], str(1101587))) + + print('{} should be in user[\'objectClass\']: {}'.format('person', 'person' in user['objectClass'])) + + print('Real query with invalid uid 9999999 should be empty: {}'.format( + (None or '').lower() == (directory.name_by_calnet_uid(9999999) or '').lower())) diff --git a/tests/account/creation_test.py b/tests/account/creation_test.py index 18d6db1a..ddda7fd3 100644 --- a/tests/account/creation_test.py +++ b/tests/account/creation_test.py @@ -283,18 +283,17 @@ def test_long_names(self, _, __): class TestAccountEligibility: - @pytest.mark.parametrize('bad_uid', [ - 1101587, # good uid, but already has account - 9999999999, # fake uid, not in university ldap - ]) - def test_validate_calnet_uid_error(self, bad_uid): + def test_validate_calnet_uid_error(self, mock_existing_calnet_uid): + with pytest.raises(ValidationError): + validate_calnet_uid(1101587) + + def test_validate_nonexistent_calnet_uid(self, mock_nonexistent_calnet_uid): with pytest.raises(ValidationError): - validate_calnet_uid(bad_uid) + validate_calnet_uid(9999999999) def test_validate_calnet_uid_success(self, mock_valid_calnet_uid): validate_calnet_uid(9999999999999) - @pytest.mark.skip(reason='Checking for affiliations temp. patched out (ocflib PR 140)') def test_validate_calnet_affiliations_failure(self, mock_invalid_calnet_uid): with pytest.raises(ValidationWarning): validate_calnet_uid(9999999999999) @@ -441,6 +440,28 @@ def fake_credentials(mock_rsa_key): ) +@pytest.yield_fixture +def mock_existing_calnet_uid(): + with mock.patch( + 'ocflib.account.search.users_by_calnet_uid', + return_value=['not empty'] + ): + yield + + +@pytest.yield_fixture +def mock_nonexistent_calnet_uid(): + with mock.patch( + 'ocflib.account.search.users_by_calnet_uid', + return_value=None + ): + with mock.patch( + 'ocflib.account.search.user_attrs_ucb', + return_value=None + ): + yield + + @pytest.yield_fixture def mock_valid_calnet_uid(): with mock.patch( diff --git a/tests/account/search_test.py b/tests/account/search_test.py index 4f9f9eae..de17dee1 100644 --- a/tests/account/search_test.py +++ b/tests/account/search_test.py @@ -2,14 +2,12 @@ from ldap3.core.exceptions import LDAPAttributeError from ocflib.account.search import user_attrs -from ocflib.account.search import user_attrs_ucb from ocflib.account.search import user_exists from ocflib.account.search import user_is_group from ocflib.account.search import user_is_sorried from ocflib.account.search import users_by_callink_oid from ocflib.account.search import users_by_calnet_uid from ocflib.account.search import users_by_filter -from tests.conftest import TEST_PERSON_CALNET_UID class TestUsersByFilter: @@ -61,17 +59,6 @@ def test_nonexistent_user(self): assert user_attrs('doesnotexist') is None -class TestUserAttrsUCB: - - def test_existing_user(self, test_uid=TEST_PERSON_CALNET_UID): - user = user_attrs_ucb(test_uid) - assert user['uid'] == [str(test_uid)] - assert 'person' in user['objectClass'] - - def test_nonexistent_user(self): - assert user_attrs_ucb(9999999999) is None - - @pytest.mark.parametrize('user,exists', [ ('ckuehl', True), ('bpreview', True), diff --git a/tests/ucb/directory_test.py b/tests/ucb/directory_test.py index 5162026c..ba03d368 100644 --- a/tests/ucb/directory_test.py +++ b/tests/ucb/directory_test.py @@ -23,13 +23,6 @@ def test_name_by_calnet_uid(self, attrs, expected): ): assert name_by_calnet_uid(0) == expected - @pytest.mark.parametrize('uid,expected', [ - (TEST_PERSON_CALNET_UID, TEST_PERSON_NAME), - (9999999, None), - ]) - def test_name_by_calnet_uid_real_query(self, uid, expected): - assert (name_by_calnet_uid(uid) or '').lower() == (expected or '').lower() - class TestCalNetUIDsByName: diff --git a/tests/vhost/web_test.py b/tests/vhost/web_test.py index 2f649e30..b4583757 100644 --- a/tests/vhost/web_test.py +++ b/tests/vhost/web_test.py @@ -53,6 +53,37 @@ def mock_get_vhosts_db(): yield +@pytest.yield_fixture +def mock_group_user_attrs(): + with mock.patch( + 'ocflib.vhost.web.user_attrs', + return_value={'callinkOid': ['0']} + ): + yield + + +@pytest.yield_fixture +def mock_staff_ucb_attrs(): + with mock.patch( + 'ocflib.vhost.web.user_attrs_ucb', + return_value={'berkeleyEduAffiliations': ['EMPLOYEE-TYPE-ACADEMIC']} + ): + with mock.patch( + 'ocflib.vhost.web.user_attrs', + return_value={'calnetUid': ['0']} + ): + yield + + +@pytest.yield_fixture +def mock_user_attrs_uneligible(): + with mock.patch( + 'ocflib.vhost.web.user_attrs_ucb', + return_value=None + ): + yield + + class TestVirtualHosts: def test_reads_file_if_exists(self): @@ -84,10 +115,14 @@ def test_proper_parse(self, mock_get_vhosts_db): def test_has_vhost(self, user, should_have_vhost, mock_get_vhosts_db): assert has_vhost(user) == should_have_vhost - @pytest.mark.parametrize('user,should_be_eligible', [ - ('mattmcal', False), - ('ggroup', True), - ('bh', True), - ]) - def test_eligible_for_vhost(self, user, should_be_eligible): - assert eligible_for_vhost(user) == should_be_eligible + @pytest.mark.usefixtures('mock_group_user_attrs') + def test_groups_eligible_for_vhost(self): + assert eligible_for_vhost('ggroups') + + @pytest.mark.usefixtures('mock_staff_ucb_attrs') + def test_staff_eligible_for_vhost(self): + assert eligible_for_vhost('bh') + + @pytest.mark.usefixtures('mock_user_attrs_uneligible') + def test_not_eligible_for_vhost(self): + assert not eligible_for_vhost('mattmcal')