Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Privileged LDAP (again) #169

Open
wants to merge 2 commits into
base: master
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
11 changes: 5 additions & 6 deletions ocflib/account/creation.py
Original file line number Diff line number Diff line change
Expand Up @@ -274,17 +274,16 @@ def validate_calnet_uid(uid):
raise ValidationError(
'CalNet UID already has account: ' + str(existing_accounts))

attrs = search.user_attrs_ucb(uid)
attrs = search.user_attrs_ucb_privileged(uid)

if not attrs:
raise ValidationError("CalNet UID can't be found in university LDAP.")

# TODO: Uncomment when we get privileged LDAP bind.
# check if user is eligible for an account
# affiliations = attrs['berkeleyEduAffiliations']
# if not eligible_for_account(affiliations):
# raise ValidationWarning(
# 'Affiliate type not eligible for account: ' + str(affiliations))
affiliations = attrs['berkeleyEduAffiliations']
if not eligible_for_account(affiliations):
raise ValidationWarning(
'Affiliate type not eligible for account: ' + str(affiliations))


def eligible_for_account(affiliations):
Expand Down
11 changes: 8 additions & 3 deletions ocflib/account/search.py
Original file line number Diff line number Diff line change
Expand Up @@ -32,7 +32,7 @@ def users_by_callink_oid(callink_oid):
return users_by_filter('(callinkOid={})'.format(callink_oid))


def user_attrs(uid, connection=ldap.ldap_ocf, base=OCF_LDAP_PEOPLE):
def user_attrs(uid, connection=ldap.ldap_ocf, base=OCF_LDAP_PEOPLE, dn=None, password=None):
"""Returns a dictionary of LDAP attributes for a given LDAP UID.

The returned dictionary looks like:
Expand All @@ -44,15 +44,20 @@ def user_attrs(uid, connection=ldap.ldap_ocf, base=OCF_LDAP_PEOPLE):

Returns None if no account exists with uid=user_account.
"""
with connection() as c:
with connection(dn, password) as c:
c.search(base, '(uid={})'.format(uid), attributes=ldap3.ALL_ATTRIBUTES)

if len(c.response) > 0:
return c.response[0]['attributes']


def user_attrs_ucb(uid):
return user_attrs(uid, connection=ldap.ldap_ucb,
return user_attrs(uid, connection=ldap.ldap_ucb_privileged,
base=UCB_LDAP_PEOPLE)


def user_attrs_ucb_privileged(uid):
return user_attrs(uid, connection=ldap.ldap_ucb_privileged,
base=UCB_LDAP_PEOPLE)


Expand Down
47 changes: 38 additions & 9 deletions ocflib/infra/ldap.py
Original file line number Diff line number Diff line change
Expand Up @@ -21,11 +21,13 @@
UCB_LDAP = 'ldap.berkeley.edu'
UCB_LDAP_URL = 'ldaps://' + UCB_LDAP
UCB_LDAP_PEOPLE = 'ou=People,dc=Berkeley,dc=EDU'
UCB_LDAP_DN = 'uid=ocf,ou=applications,dc=berkeley,dc=edu'
UCB_LDAP_PASSWORD_PATH = '/etc/ucbldap.passwd'


@contextmanager
def ldap_connection(host):
"""Context manager that provides an ldap3 Connection.
def ldap_connection(host, dn=None, password=None):
"""Context manager that provides an ldap3 Connection. Also supports optional credentials for a privileged bind.

Example usage:

Expand All @@ -37,31 +39,47 @@ def ldap_connection(host):

:param host: server hostname
"""

server = ldap3.Server(host, use_ssl=True)
with ldap3.Connection(server) as connection:
with ldap3.Connection(server, dn, password) as connection:
yield connection


def ldap_ocf():
"""Context manager that provides an ldap3 Connection to OCF's LDAP server.
def ldap_ocf(dn=None, password=None):
"""Context manager that provides an ldap3 Connection to OCF's LDAP server. Accepts optional DN and password.

Example usage:

with ldap_ocf() as c:
c.search(OCF_LDAP_PEOPLE, '(uid=ckuehl)', attributes=['uidNumber'])
"""
return ldap_connection(OCF_LDAP)
return ldap_connection(OCF_LDAP, dn, password)


def ldap_ucb():
"""Context manager that provides an ldap3 Connection to the campus LDAP.
def ldap_ucb(dn=None, password=None):
"""Context manager that provides an ldap3 Connection to the campus LDAP. Accepts optional DN and password.

Example usage:

with ldap_ucb() as c:
c.search(UCB_LDAP_PEOPLE, '(uid=ckuehl)', attributes=['uidNumber'])
"""
return ldap_connection(UCB_LDAP)
return ldap_connection(UCB_LDAP, dn, password)


def ldap_ucb_privileged(dn=None, password=None):
"""Context manager that provides a privileged ldap3 Connection to the campus LDAP.

Note that this method will ignore all dn and password arguments,
which are being kept for compatibility with user_attrs().

Example usage:

with ldap_ucb_privileged() as c:
c.search(UCB_LDAP_PEOPLE, '(uid=ckuehl)', attributes=['uidNumber'])
"""
password = _read_ucb_password()
return ldap_ucb(UCB_LDAP_DN, password)


def _format_attr(key, values):
Expand Down Expand Up @@ -226,3 +244,14 @@ def format_timestamp(timestamp):
if timestamp.tzinfo is None or timestamp.tzinfo.utcoffset(timestamp) is None:
raise ValueError('Timestamp has no timezone info')
return timestamp.strftime('%Y%m%d%H%M%S%z')


def _read_ucb_password():
"""Returns a string of the current campus LDAP privileged bind password
found in UCB_LDAP_PASSWORD_PATH

:return: A string of the campus LDAP bind password
"""

with open(UCB_LDAP_PASSWORD_PATH, 'r') as passwordFile:
return passwordFile.read()
8 changes: 4 additions & 4 deletions ocflib/vhost/web.py
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,7 @@
import requests

from ocflib.account.search import user_attrs
from ocflib.account.search import user_attrs_ucb
from ocflib.account.search import user_attrs_ucb_privileged

VHOST_DB_PATH = '/home/s/st/staff/vhost/vhost.conf'
VHOST_DB_URL = 'https://www.ocf.berkeley.edu/~staff/vhost.conf'
Expand Down Expand Up @@ -86,9 +86,9 @@ def eligible_for_vhost(user):
if 'callinkOid' in attrs:
return True
elif 'calnetUid' in attrs:
attrs_ucb = user_attrs_ucb(attrs['calnetUid'])
# TODO: Uncomment when we get a privileged LDAP bind.
if attrs_ucb: # and 'EMPLOYEE-TYPE-ACADEMIC' in attrs_ucb['berkeleyEduAffiliations']:
attrs_ucb = user_attrs_ucb_privileged(attrs['calnetUid'])

if attrs_ucb and 'EMPLOYEE-TYPE-ACADEMIC' in attrs_ucb['berkeleyEduAffiliations']:
return True

return False
12 changes: 12 additions & 0 deletions tests-manual/account/search
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@
#!/usr/bin/env python3
import ocflib.account.search as search
import ocflib.ucb.directory as directory

if __name__ == '__main__':
print('Users with CalNet UID 1034192: {}'.format(
Expand All @@ -19,3 +20,14 @@ if __name__ == '__main__':

for user in ('ckuehl', 'ggroup'):
print('User {} is group: {}'.format(user, search.user_is_group(user)))

""" Note: the following tests will fail if the password file in
UCB_LDAP_PASSWORD_PATH does not exist or is invalid
"""
user = search.user_attrs_ucb(1101587) # jvperrin's uid
print('User with uid {} has uid {}'.format(user['uid'], str(1101587)))

print('{} should be in user[\'objectClass\']: {}'.format('person', 'person' in user['objectClass']))

print('Real query with invalid uid 9999999 should be empty: {}'.format(
(None or '').lower() == (directory.name_by_calnet_uid(9999999) or '').lower()))
39 changes: 30 additions & 9 deletions tests/account/creation_test.py
Original file line number Diff line number Diff line change
Expand Up @@ -283,18 +283,17 @@ def test_long_names(self, _, __):

class TestAccountEligibility:

@pytest.mark.parametrize('bad_uid', [
1101587, # good uid, but already has account
9999999999, # fake uid, not in university ldap
])
def test_validate_calnet_uid_error(self, bad_uid):
def test_validate_calnet_uid_error(self, mock_existing_calnet_uid):
with pytest.raises(ValidationError):
validate_calnet_uid(1101587)

def test_validate_nonexistent_calnet_uid(self, mock_nonexistent_calnet_uid):
with pytest.raises(ValidationError):
validate_calnet_uid(bad_uid)
validate_calnet_uid(9999999999)

def test_validate_calnet_uid_success(self, mock_valid_calnet_uid):
validate_calnet_uid(9999999999999)

@pytest.mark.skip(reason='Checking for affiliations temp. patched out (ocflib PR 140)')
def test_validate_calnet_affiliations_failure(self, mock_invalid_calnet_uid):
with pytest.raises(ValidationWarning):
validate_calnet_uid(9999999999999)
Expand Down Expand Up @@ -441,10 +440,32 @@ def fake_credentials(mock_rsa_key):
)


@pytest.yield_fixture
def mock_existing_calnet_uid():
with mock.patch(
'ocflib.account.search.users_by_calnet_uid',
return_value=['not empty']
):
yield


@pytest.yield_fixture
def mock_nonexistent_calnet_uid():
with mock.patch(
'ocflib.account.search.users_by_calnet_uid',
return_value=None
):
with mock.patch(
'ocflib.account.search.user_attrs_ucb',
return_value=None
):
yield


@pytest.yield_fixture
def mock_valid_calnet_uid():
with mock.patch(
'ocflib.account.search.user_attrs_ucb',
'ocflib.account.search.user_attrs_ucb_privileged',
return_value={'berkeleyEduAffiliations': ['STUDENT-TYPE-REGISTERED']}
):
yield
Expand All @@ -453,7 +474,7 @@ def mock_valid_calnet_uid():
@pytest.yield_fixture
def mock_invalid_calnet_uid():
with mock.patch(
'ocflib.account.search.user_attrs_ucb',
'ocflib.account.search.user_attrs_ucb_privileged',
return_value={'berkeleyEduAffiliations': ['STUDENT-STATUS-EXPIRED']},
):
yield
Expand Down
13 changes: 0 additions & 13 deletions tests/account/search_test.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,14 +2,12 @@
from ldap3.core.exceptions import LDAPAttributeError

from ocflib.account.search import user_attrs
from ocflib.account.search import user_attrs_ucb
from ocflib.account.search import user_exists
from ocflib.account.search import user_is_group
from ocflib.account.search import user_is_sorried
from ocflib.account.search import users_by_callink_oid
from ocflib.account.search import users_by_calnet_uid
from ocflib.account.search import users_by_filter
from tests.conftest import TEST_PERSON_CALNET_UID


class TestUsersByFilter:
Expand Down Expand Up @@ -61,17 +59,6 @@ def test_nonexistent_user(self):
assert user_attrs('doesnotexist') is None


class TestUserAttrsUCB:

def test_existing_user(self, test_uid=TEST_PERSON_CALNET_UID):
user = user_attrs_ucb(test_uid)
assert user['uid'] == [str(test_uid)]
assert 'person' in user['objectClass']

def test_nonexistent_user(self):
assert user_attrs_ucb(9999999999) is None


@pytest.mark.parametrize('user,exists', [
('ckuehl', True),
('bpreview', True),
Expand Down
7 changes: 0 additions & 7 deletions tests/ucb/directory_test.py
Original file line number Diff line number Diff line change
Expand Up @@ -23,13 +23,6 @@ def test_name_by_calnet_uid(self, attrs, expected):
):
assert name_by_calnet_uid(0) == expected

@pytest.mark.parametrize('uid,expected', [
(TEST_PERSON_CALNET_UID, TEST_PERSON_NAME),
(9999999, None),
])
def test_name_by_calnet_uid_real_query(self, uid, expected):
assert (name_by_calnet_uid(uid) or '').lower() == (expected or '').lower()


class TestCalNetUIDsByName:

Expand Down
49 changes: 42 additions & 7 deletions tests/vhost/web_test.py
Original file line number Diff line number Diff line change
Expand Up @@ -53,6 +53,37 @@ def mock_get_vhosts_db():
yield


@pytest.yield_fixture
def mock_group_user_attrs():
with mock.patch(
'ocflib.vhost.web.user_attrs',
return_value={'callinkOid': ['0']}
):
yield


@pytest.yield_fixture
def mock_staff_ucb_attrs():
with mock.patch(
'ocflib.vhost.web.user_attrs_ucb',
return_value={'berkeleyEduAffiliations': ['EMPLOYEE-TYPE-ACADEMIC']}
):
with mock.patch(
'ocflib.vhost.web.user_attrs',
return_value={'calnetUid': ['0']}
):
yield


@pytest.yield_fixture
def mock_user_attrs_uneligible():
with mock.patch(
'ocflib.vhost.web.user_attrs_ucb',
return_value=None
):
yield


class TestVirtualHosts:

def test_reads_file_if_exists(self):
Expand Down Expand Up @@ -84,10 +115,14 @@ def test_proper_parse(self, mock_get_vhosts_db):
def test_has_vhost(self, user, should_have_vhost, mock_get_vhosts_db):
assert has_vhost(user) == should_have_vhost

@pytest.mark.parametrize('user,should_be_eligible', [
('mattmcal', False),
('ggroup', True),
('bh', True),
])
def test_eligible_for_vhost(self, user, should_be_eligible):
assert eligible_for_vhost(user) == should_be_eligible
@pytest.mark.usefixtures('mock_group_user_attrs')
def test_groups_eligible_for_vhost(self):
assert eligible_for_vhost('ggroups')

@pytest.mark.usefixtures('mock_staff_ucb_attrs')
def test_staff_eligible_for_vhost(self):
assert eligible_for_vhost('bh')

@pytest.mark.usefixtures('mock_user_attrs_uneligible')
def test_not_eligible_for_vhost(self):
assert not eligible_for_vhost('mattmcal')