diff --git a/gen3cirrus/google_cloud/manager.py b/gen3cirrus/google_cloud/manager.py index fd080df3..1859c938 100644 --- a/gen3cirrus/google_cloud/manager.py +++ b/gen3cirrus/google_cloud/manager.py @@ -1360,6 +1360,35 @@ def add_member_to_group(self, member_email, group_id): return member_to_add + + def get_groups_for_user(self, member_email): + """ + Retrieves all groups that a user is a member of. + + Args: + member_email (str): email for member to get groups for + + Returns: + list: List of group email addresses that the user is a member of. + """ + all_group_emails = [] + response = self._admin_service.groups().list(userKey=member_email).execute() + groups = response.get('groups', []) + + group_emails = [group['email'] for group in groups] + all_group_emails.extend(group_emails) + + while response.get("nextPageToken"): + response = self._admin_service.groups().list(userKey=member_email, pageToken=response["nextPageToken"]).execute() + groups = response.get('groups', []) + + group_emails = [group['email'] for group in groups] + all_group_emails.extend(group_emails) + + + return all_group_emails + + def _is_member_in_group(self, member_email, group_id): member_emails = [ member.get("email", "") for member in self.get_group_members(group_id) diff --git a/test/test_google_cloud_manage.py b/test/test_google_cloud_manage.py index f6940798..ed3b9321 100644 --- a/test/test_google_cloud_manage.py +++ b/test/test_google_cloud_manage.py @@ -1636,6 +1636,102 @@ def __init__(self, status_numeral): assert 500 == execinfo.value.status_code assert "Failed to delete for unknown reason" in str(execinfo.value) +import copy + + +def test_get_groups_for_user(test_cloud_manager): + """ + Test get groups for a user calls google API with provided info and that response is returned. + """ + # Setup # + member_email = "user@example.com" + group_1_email = "group1@example.com" + group_2_email = "group2@example.com" + groups = [ + {"email": group_1_email}, + {"email": group_2_email}, + ] + full_response = { + "groups": groups, + "nextPageToken": None, + } + mock_config = { + "groups.return_value.list.return_value.execute.return_value": full_response + } + test_cloud_manager._admin_service.configure_mock(**mock_config) + + # Call # + group_emails = test_cloud_manager.get_groups_for_user(member_email) + + # Test # + assert len(group_emails) == 2 + assert group_1_email in group_emails + assert group_2_email in group_emails + + # Check if member email is somewhere in the args + args, kwargs = test_cloud_manager._admin_service.groups.return_value.list.call_args + assert any(member_email in str(arg) for arg in args) or any( + member_email in str(kwarg) for kwarg in kwargs.values() + ) + + +def test_get_groups_for_user_pagination(test_cloud_manager): + """ + Test that getting all groups for a user actually gets them all, even when pagination is required. + """ + # Setup # + member_email = "user@example.com" + group_1_email = "group1@example.com" + group_2_email = "group2@example.com" + group_3_email = "group3@example.com" + group_4_email = "group4@example.com" + groups_page_1 = [ + {"email": group_1_email}, + {"email": group_2_email}, + ] + next_page_token = "abcdefg" + full_response = { + "groups": groups_page_1, + "nextPageToken": next_page_token, + } + groups_page_2 = [ + {"email": group_3_email}, + {"email": group_4_email}, + ] + response_2 = { + "groups": groups_page_2, + "nextPageToken": None, + } + + two_pages = [full_response, response_2] + + mock_config = { + "groups.return_value.list.return_value.execute.side_effect": two_pages + } + + test_cloud_manager._admin_service.configure_mock(**mock_config) + + # Call # + group_emails = test_cloud_manager.get_groups_for_user(member_email) + + # Test # + assert len(group_emails) == 4 + assert group_1_email in group_emails + assert group_2_email in group_emails + assert group_3_email in group_emails + assert group_4_email in group_emails + + # Check if nextPageToken was passed correctly + args, kwargs = test_cloud_manager._admin_service.groups.return_value.list.call_args + assert kwargs["pageToken"] == next_page_token + + # Check if member email is somewhere in the args + args, kwargs = test_cloud_manager._admin_service.groups.return_value.list.call_args + assert any(member_email in str(arg) for arg in args) or any( + member_email in str(kwarg) for kwarg in kwargs.values() + ) + + if __name__ == "__main__": pytest.main(["-x", "-v", "."])