diff --git a/awx/api/views.py b/awx/api/views.py index 2f7ca97739..0b942e5afc 100644 --- a/awx/api/views.py +++ b/awx/api/views.py @@ -775,20 +775,34 @@ class TeamRolesList(SubListCreateAttachDetachAPIView): return Response(data, status=status.HTTP_400_BAD_REQUEST) return super(type(self), self).post(request, *args, **kwargs) -class TeamProjectsList(SubListCreateAttachDetachAPIView): +class TeamProjectsList(SubListAPIView): model = Project serializer_class = ProjectSerializer parent_model = Team - relationship = 'projects' -class TeamCredentialsList(SubListCreateAttachDetachAPIView): + def get_queryset(self): + team = self.get_parent_object() + self.check_parent_access(team) + team_qs = Project.objects.filter(Q(member_role__parents=team.member_role) | Q(admin_role__parents=team.member_role)) + user_qs = Project.accessible_objects(self.request.user, {'read': True}) + return team_qs & user_qs + + +class TeamCredentialsList(SubListAPIView): model = Credential serializer_class = CredentialSerializer parent_model = Team - relationship = 'credentials' - parent_key = 'team' + + def get_queryset(self): + team = self.get_parent_object() + self.check_parent_access(team) + + visible_creds = Credential.accessible_objects(self.request.user, {'read': True}) + team_creds = Credential.objects.filter(owner_role__parents=team.member_role) + return team_creds & visible_creds + class TeamActivityStreamList(SubListAPIView): @@ -1041,7 +1055,6 @@ class UserProjectsList(SubListAPIView): model = Project serializer_class = ProjectSerializer parent_model = User - relationship = 'projects' def get_queryset(self): parent = self.get_parent_object() @@ -1050,13 +1063,19 @@ class UserProjectsList(SubListAPIView): user_qs = Project.accessible_objects(parent, {'read': True}) return my_qs & user_qs -class UserCredentialsList(SubListCreateAttachDetachAPIView): +class UserCredentialsList(SubListAPIView): model = Credential serializer_class = CredentialSerializer parent_model = User - relationship = 'credentials' - parent_key = 'user' + + def get_queryset(self): + user = self.get_parent_object() + self.check_parent_access(user) + + visible_creds = Credential.accessible_objects(self.request.user, {'read': True}) + user_creds = Credential.accessible_objects(user, {'read': True}) + return user_creds & visible_creds class UserOrganizationsList(SubListAPIView): diff --git a/awx/main/tests/functional/conftest.py b/awx/main/tests/functional/conftest.py index c2abe4ffd5..7d45c51fad 100644 --- a/awx/main/tests/functional/conftest.py +++ b/awx/main/tests/functional/conftest.py @@ -32,6 +32,7 @@ from awx.main.models.inventory import ( from awx.main.models.organization import ( Organization, Permission, + Team, ) from awx.main.models.rbac import Role @@ -102,6 +103,33 @@ def project(instance, organization): ) return prj +@pytest.fixture +def project_factory(organization): + def factory(name): + try: + prj = Project.objects.get(name=name) + except Project.DoesNotExist: + prj = Project.objects.create(name=name, + description="description for " + name, + scm_type="git", + scm_url="https://github.com/jlaska/ansible-playbooks", + organization=organization + ) + return prj + return factory + +@pytest.fixture +def team_factory(organization): + def factory(name): + try: + t = Team.objects.get(name=name) + except Team.DoesNotExist: + t = Team.objects.create(name=name, + description="description for " + name, + organization=organization) + return t + return factory + @pytest.fixture def user_project(user): owner = user('owner') diff --git a/awx/main/tests/functional/test_projects.py b/awx/main/tests/functional/test_projects.py new file mode 100644 index 0000000000..13f2a23129 --- /dev/null +++ b/awx/main/tests/functional/test_projects.py @@ -0,0 +1,97 @@ +import mock # noqa +import pytest + +from django.core.urlresolvers import reverse + + + +# +# Project listing and visibility tests +# + +@pytest.mark.django_db +def test_user_project_list(get, project_factory, admin, alice, bob): + 'List of projects a user has access to, filtered by projects you can also see' + + alice_project = project_factory('alice project') + alice_project.admin_role.members.add(alice) + + bob_project = project_factory('bob project') + bob_project.admin_role.members.add(bob) + + shared_project = project_factory('shared project') + shared_project.admin_role.members.add(alice) + shared_project.admin_role.members.add(bob) + + # admins can see all projects + assert get(reverse('api:user_projects_list', args=(admin.pk,)), admin).data['count'] == 3 + + # admins can see everyones projects + assert get(reverse('api:user_projects_list', args=(alice.pk,)), admin).data['count'] == 2 + assert get(reverse('api:user_projects_list', args=(bob.pk,)), admin).data['count'] == 2 + + # users can see their own projects + assert get(reverse('api:user_projects_list', args=(alice.pk,)), alice).data['count'] == 2 + + # alice should only be able to see the shared project when looking at bobs projects + assert get(reverse('api:user_projects_list', args=(bob.pk,)), alice).data['count'] == 1 + + # alice should see all projects they can see when viewing an admin + assert get(reverse('api:user_projects_list', args=(admin.pk,)), alice).data['count'] == 2 + + +@pytest.mark.django_db +def test_team_project_list(get, project_factory, team_factory, admin, alice, bob): + 'List of projects a team has access to, filtered by projects you can also see' + team1 = team_factory('team1') + team2 = team_factory('team2') + + team1_project = project_factory('team1 project') + team1_project.admin_role.parents.add(team1.member_role) + + team2_project = project_factory('team2 project') + team2_project.admin_role.parents.add(team2.member_role) + + shared_project = project_factory('shared project') + shared_project.admin_role.parents.add(team1.member_role) + shared_project.admin_role.parents.add(team2.member_role) + + team1.member_role.members.add(alice) + team2.member_role.members.add(bob) + + # admins can see all projects on a team + assert get(reverse('api:team_projects_list', args=(team1.pk,)), admin).data['count'] == 2 + assert get(reverse('api:team_projects_list', args=(team2.pk,)), admin).data['count'] == 2 + + # users can see all projects on teams they are a member of + assert get(reverse('api:team_projects_list', args=(team1.pk,)), alice).data['count'] == 2 + + # alice should not be able to see team2 projects because she doesn't have access to team2 + res = get(reverse('api:team_projects_list', args=(team2.pk,)), alice) + assert res.status_code == 403 + # but if she does, then she should only see the shared project + team2.auditor_role.members.add(alice) + assert get(reverse('api:team_projects_list', args=(team2.pk,)), alice).data['count'] == 1 + team2.auditor_role.members.remove(alice) + + + # Test user endpoints first, very similar tests to test_user_project_list + # but permissions are being derived from team membership instead. + + # admins can see all projects + assert get(reverse('api:user_projects_list', args=(admin.pk,)), admin).data['count'] == 3 + + # admins can see everyones projects + assert get(reverse('api:user_projects_list', args=(alice.pk,)), admin).data['count'] == 2 + assert get(reverse('api:user_projects_list', args=(bob.pk,)), admin).data['count'] == 2 + + # users can see their own projects + assert get(reverse('api:user_projects_list', args=(alice.pk,)), alice).data['count'] == 2 + + # alice should not be able to see bob + res = get(reverse('api:user_projects_list', args=(bob.pk,)), alice) + assert res.status_code == 403 + + # alice should see all projects they can see when viewing an admin + assert get(reverse('api:user_projects_list', args=(admin.pk,)), alice).data['count'] == 2 + diff --git a/awx/main/tests/old/projects.py b/awx/main/tests/old/projects.py index f938f34652..8b9cd1d87c 100644 --- a/awx/main/tests/old/projects.py +++ b/awx/main/tests/old/projects.py @@ -468,309 +468,7 @@ class ProjectsTest(BaseTransactionTest): got = self.get(url, expect=401) got = self.get(url, expect=200, auth=self.get_super_credentials()) - # ===================================================================== - # CREDENTIALS - other_creds = reverse('api:user_credentials_list', args=(other.pk,)) - team_creds = reverse('api:team_credentials_list', args=(team.pk,)) - - new_credentials = dict( - name = 'credential', - project = Project.objects.order_by('pk')[0].pk, - default_username = 'foo', - ssh_key_data = TEST_SSH_KEY_DATA_LOCKED, - ssh_key_unlock = TEST_SSH_KEY_DATA_UNLOCK, - ssh_password = 'narf', - sudo_password = 'troz', - security_token = '', - vault_password = None, - ) - - # can add credentials to a user (if user or org admin or super user) - self.post(other_creds, data=new_credentials, expect=401) - self.post(other_creds, data=new_credentials, expect=401, auth=self.get_invalid_credentials()) - new_credentials['team'] = team.pk - result = self.post(other_creds, data=new_credentials, expect=201, auth=self.get_super_credentials()) - cred_user = result['id'] - self.assertEqual(result['team'], None) - del new_credentials['team'] - new_credentials['name'] = 'credential2' - self.post(other_creds, data=new_credentials, expect=201, auth=self.get_normal_credentials()) - new_credentials['name'] = 'credential3' - result = self.post(other_creds, data=new_credentials, expect=201, auth=self.get_other_credentials()) - new_credentials['name'] = 'credential4' - self.post(other_creds, data=new_credentials, expect=403, auth=self.get_nobody_credentials()) - - # can add credentials to a team - new_credentials['name'] = 'credential' - new_credentials['user'] = other.pk - self.post(team_creds, data=new_credentials, expect=401) - self.post(team_creds, data=new_credentials, expect=401, auth=self.get_invalid_credentials()) - result = self.post(team_creds, data=new_credentials, expect=201, auth=self.get_super_credentials()) - self.assertEqual(result['user'], None) - del new_credentials['user'] - new_credentials['name'] = 'credential2' - result = self.post(team_creds, data=new_credentials, expect=201, auth=self.get_normal_credentials()) - new_credentials['name'] = 'credential3' - self.post(team_creds, data=new_credentials, expect=403, auth=self.get_other_credentials()) - self.post(team_creds, data=new_credentials, expect=403, auth=self.get_nobody_credentials()) - cred_team = result['id'] - - # can list credentials on a user - self.get(other_creds, expect=401) - self.get(other_creds, expect=401, auth=self.get_invalid_credentials()) - self.get(other_creds, expect=200, auth=self.get_super_credentials()) - self.get(other_creds, expect=200, auth=self.get_normal_credentials()) - self.get(other_creds, expect=200, auth=self.get_other_credentials()) - self.get(other_creds, expect=403, auth=self.get_nobody_credentials()) - - # can list credentials on a team - self.get(team_creds, expect=401) - self.get(team_creds, expect=401, auth=self.get_invalid_credentials()) - self.get(team_creds, expect=200, auth=self.get_super_credentials()) - self.get(team_creds, expect=200, auth=self.get_normal_credentials()) - self.get(team_creds, expect=403, auth=self.get_other_credentials()) - self.get(team_creds, expect=403, auth=self.get_nobody_credentials()) - - # Check /api/v1/credentials (GET) - url = reverse('api:credential_list') - with self.current_user(self.super_django_user): - self.options(url) - self.head(url) - response = self.get(url) - qs = Credential.objects.all() - self.check_pagination_and_size(response, qs.count()) - self.check_list_ids(response, qs) - - # POST should now work for all users. - with self.current_user(self.super_django_user): - data = dict(name='xyz', user=self.super_django_user.pk) - self.post(url, data, expect=201) - - # Repeating the same POST should violate a unique constraint. - with self.current_user(self.super_django_user): - data = dict(name='xyz', user=self.super_django_user.pk) - response = self.post(url, data, expect=400) - self.assertTrue('__all__' in response, response) - self.assertTrue('already exists' in response['__all__'][0], response) - - # Test with null where we expect a string value. Value will be coerced - # to an empty string. - with self.current_user(self.super_django_user): - data = dict(name='zyx', user=self.super_django_user.pk, kind='ssh', - become_username=None) - response = self.post(url, data, expect=201) - self.assertEqual(response['become_username'], '') - - # Test with encrypted ssh key and no unlock password. - with self.current_user(self.super_django_user): - data = dict(name='wxy', user=self.super_django_user.pk, kind='ssh', - ssh_key_data=TEST_SSH_KEY_DATA_LOCKED) - self.post(url, data, expect=400) - data['ssh_key_unlock'] = TEST_SSH_KEY_DATA_UNLOCK - self.post(url, data, expect=201) - - # Test with invalid ssh key data. - with self.current_user(self.super_django_user): - bad_key_data = TEST_SSH_KEY_DATA.replace('PRIVATE', 'PUBLIC') - data = dict(name='wyx', user=self.super_django_user.pk, kind='ssh', - ssh_key_data=bad_key_data) - self.post(url, data, expect=400) - data['ssh_key_data'] = TEST_SSH_KEY_DATA.replace('-', '=') - self.post(url, data, expect=400) - data['ssh_key_data'] = '\n'.join(TEST_SSH_KEY_DATA.splitlines()[1:-1]) - self.post(url, data, expect=400) - data['ssh_key_data'] = TEST_SSH_KEY_DATA.replace('--B', '---B') - self.post(url, data, expect=400) - data['ssh_key_data'] = TEST_SSH_KEY_DATA - self.post(url, data, expect=201) - - # Test with OpenSSH format private key. - with self.current_user(self.super_django_user): - data = dict(name='openssh-unlocked', user=self.super_django_user.pk, kind='ssh', - ssh_key_data=TEST_OPENSSH_KEY_DATA) - self.post(url, data, expect=201) - - # Test with OpenSSH format private key that requires passphrase. - with self.current_user(self.super_django_user): - data = dict(name='openssh-locked', user=self.super_django_user.pk, kind='ssh', - ssh_key_data=TEST_OPENSSH_KEY_DATA_LOCKED) - self.post(url, data, expect=400) - data['ssh_key_unlock'] = TEST_SSH_KEY_DATA_UNLOCK - self.post(url, data, expect=201) - - # Test post as organization admin where team is part of org, but user - # creating credential is not a member of the team. UI may pass user - # as an empty string instead of None. - normal_org = self.organizations[1] # normal user is an admin of this - org_team = normal_org.teams.create(name='new empty team') - with self.current_user(self.normal_django_user): - data = { - 'name': 'my team cred', - 'team': org_team.pk, - 'user': '', - } - self.post(url, data, expect=201) - - # FIXME: Check list as other users. - - # can edit a credential - cred_user = Credential.objects.get(pk=cred_user) - cred_team = Credential.objects.get(pk=cred_team) - d_cred_user = dict(id=cred_user.pk, name='x', sudo_password='blippy', user=cred_user.user.pk) - d_cred_user2 = dict(id=cred_user.pk, name='x', sudo_password='blippy', user=self.super_django_user.pk) - d_cred_team = dict(id=cred_team.pk, name='x', sudo_password='blippy', team=cred_team.team.pk) - edit_creds1 = reverse('api:credential_detail', args=(cred_user.pk,)) - edit_creds2 = reverse('api:credential_detail', args=(cred_team.pk,)) - - self.put(edit_creds1, data=d_cred_user, expect=401) - self.put(edit_creds1, data=d_cred_user, expect=401, auth=self.get_invalid_credentials()) - self.put(edit_creds1, data=d_cred_user, expect=200, auth=self.get_super_credentials()) - self.put(edit_creds1, data=d_cred_user, expect=200, auth=self.get_normal_credentials()) - - # We now allow credential to be reassigned (with the right permissions). - cred_put_u = self.put(edit_creds1, data=d_cred_user2, expect=200, auth=self.get_normal_credentials()) - self.put(edit_creds1, data=d_cred_user, expect=403, auth=self.get_other_credentials()) - - self.put(edit_creds2, data=d_cred_team, expect=401) - self.put(edit_creds2, data=d_cred_team, expect=401, auth=self.get_invalid_credentials()) - self.put(edit_creds2, data=d_cred_team, expect=200, auth=self.get_super_credentials()) - cred_put_t = self.put(edit_creds2, data=d_cred_team, expect=200, auth=self.get_normal_credentials()) - self.put(edit_creds2, data=d_cred_team, expect=403, auth=self.get_other_credentials()) - - # Reassign credential between team and user. - with self.current_user(self.super_django_user): - self.post(team_creds, data=dict(id=cred_user.pk), expect=204) - response = self.get(edit_creds1) - self.assertEqual(response['team'], team.pk) - self.assertEqual(response['user'], None) - self.post(other_creds, data=dict(id=cred_user.pk), expect=204) - response = self.get(edit_creds1) - self.assertEqual(response['team'], None) - self.assertEqual(response['user'], other.pk) - self.post(other_creds, data=dict(id=cred_team.pk), expect=204) - response = self.get(edit_creds2) - self.assertEqual(response['team'], None) - self.assertEqual(response['user'], other.pk) - self.post(team_creds, data=dict(id=cred_team.pk), expect=204) - response = self.get(edit_creds2) - self.assertEqual(response['team'], team.pk) - self.assertEqual(response['user'], None) - - cred_put_t['disassociate'] = 1 - team_url = reverse('api:team_credentials_list', args=(cred_put_t['team'],)) - self.post(team_url, data=cred_put_t, expect=204, auth=self.get_normal_credentials()) - - # can remove credentials from a user (via disassociate) - this will delete the credential. - cred_put_u['disassociate'] = 1 - url = cred_put_u['url'] - user_url = reverse('api:user_credentials_list', args=(cred_put_u['user'],)) - self.post(user_url, data=cred_put_u, expect=204, auth=self.get_normal_credentials()) - - # can delete a credential directly -- probably won't be used too often - #data = self.delete(url, expect=204, auth=self.get_other_credentials()) - data = self.delete(url, expect=404, auth=self.get_other_credentials()) - - # ===================================================================== - # PERMISSIONS - - user = self.other_django_user - team = Team.objects.order_by('pk')[0] - organization = Organization.objects.order_by('pk')[0] - inventory = Inventory.objects.create( - name = 'test inventory', - organization = organization, - created_by = self.super_django_user - ) - project = Project.objects.order_by('pk')[0] - - # can add permissions to a user - - user_permission = dict( - name='user can deploy a certain project to a certain inventory', - # user=user.pk, # no need to specify, this will be automatically filled in - inventory=inventory.pk, - project=project.pk, - permission_type=PERM_INVENTORY_DEPLOY, - run_ad_hoc_commands=None, - ) - team_permission = dict( - name='team can deploy a certain project to a certain inventory', - # team=team.pk, # no need to specify, this will be automatically filled in - inventory=inventory.pk, - project=project.pk, - permission_type=PERM_INVENTORY_DEPLOY, - ) - - url = reverse('api:user_permissions_list', args=(user.pk,)) - posted = self.post(url, user_permission, expect=201, auth=self.get_super_credentials()) - url2 = posted['url'] - user_perm_detail = posted['url'] - got = self.get(url2, expect=200, auth=self.get_other_credentials()) - - # cannot add permissions that apply to both team and user - url = reverse('api:user_permissions_list', args=(user.pk,)) - user_permission['name'] = 'user permission 2' - user_permission['team'] = team.pk - self.post(url, user_permission, expect=400, auth=self.get_super_credentials()) - - # cannot set admin/read/write permissions when a project is involved. - user_permission.pop('team') - user_permission['name'] = 'user permission 3' - user_permission['permission_type'] = PERM_INVENTORY_ADMIN - self.post(url, user_permission, expect=400, auth=self.get_super_credentials()) - - # project is required for a deployment permission - user_permission['name'] = 'user permission 4' - user_permission['permission_type'] = PERM_INVENTORY_DEPLOY - user_permission.pop('project') - self.post(url, user_permission, expect=400, auth=self.get_super_credentials()) - - # can add permissions on a team - url = reverse('api:team_permissions_list', args=(team.pk,)) - posted = self.post(url, team_permission, expect=201, auth=self.get_super_credentials()) - url2 = posted['url'] - # check we can get that permission back - got = self.get(url2, expect=200, auth=self.get_other_credentials()) - - # cannot add permissions that apply to both team and user - url = reverse('api:team_permissions_list', args=(team.pk,)) - team_permission['name'] += '2' - team_permission['user'] = user.pk - self.post(url, team_permission, expect=400, auth=self.get_super_credentials()) - del team_permission['user'] - - # can list permissions on a user - url = reverse('api:user_permissions_list', args=(user.pk,)) - got = self.get(url, expect=200, auth=self.get_super_credentials()) - got = self.get(url, expect=200, auth=self.get_other_credentials()) - got = self.get(url, expect=403, auth=self.get_nobody_credentials()) - - # can list permissions on a team - url = reverse('api:team_permissions_list', args=(team.pk,)) - got = self.get(url, expect=200, auth=self.get_super_credentials()) - got = self.get(url, expect=200, auth=self.get_other_credentials()) - got = self.get(url, expect=403, auth=self.get_nobody_credentials()) - - # can edit a permission -- reducing the permission level - team_permission['permission_type'] = PERM_INVENTORY_CHECK - self.put(url2, team_permission, expect=200, auth=self.get_super_credentials()) - self.put(url2, team_permission, expect=403, auth=self.get_other_credentials()) - - # can remove permissions - # do need to disassociate, just delete it - self.delete(url2, expect=403, auth=self.get_other_credentials()) - self.delete(url2, expect=204, auth=self.get_super_credentials()) - self.delete(user_perm_detail, expect=204, auth=self.get_super_credentials()) - self.delete(url2, expect=404, auth=self.get_other_credentials()) - - # User is still a team member - self.get(reverse('api:project_detail', args=(project.pk,)), expect=200, auth=self.get_other_credentials()) - - team.member_role.members.remove(self.other_django_user) - - # User is no longer a team member and has no permissions - self.get(reverse('api:project_detail', args=(project.pk,)), expect=403, auth=self.get_other_credentials()) @override_settings(CELERY_ALWAYS_EAGER=True, CELERY_EAGER_PROPAGATES_EXCEPTIONS=True,