1
0
mirror of https://github.com/ansible/awx.git synced 2024-10-31 23:51:09 +03:00

Moved a couple of test cases from old/projects.py tests to new test_projects.py tests

This commit is contained in:
Akita Noek 2016-03-23 14:47:01 -04:00
parent 9574c3b506
commit 9dbe9fb7ad
4 changed files with 153 additions and 311 deletions

View File

@ -775,20 +775,34 @@ class TeamRolesList(SubListCreateAttachDetachAPIView):
return Response(data, status=status.HTTP_400_BAD_REQUEST)
return super(type(self), self).post(request, *args, **kwargs)
class TeamProjectsList(SubListCreateAttachDetachAPIView):
class TeamProjectsList(SubListAPIView):
model = Project
serializer_class = ProjectSerializer
parent_model = Team
relationship = 'projects'
class TeamCredentialsList(SubListCreateAttachDetachAPIView):
def get_queryset(self):
team = self.get_parent_object()
self.check_parent_access(team)
team_qs = Project.objects.filter(Q(member_role__parents=team.member_role) | Q(admin_role__parents=team.member_role))
user_qs = Project.accessible_objects(self.request.user, {'read': True})
return team_qs & user_qs
class TeamCredentialsList(SubListAPIView):
model = Credential
serializer_class = CredentialSerializer
parent_model = Team
relationship = 'credentials'
parent_key = 'team'
def get_queryset(self):
team = self.get_parent_object()
self.check_parent_access(team)
visible_creds = Credential.accessible_objects(self.request.user, {'read': True})
team_creds = Credential.objects.filter(owner_role__parents=team.member_role)
return team_creds & visible_creds
class TeamActivityStreamList(SubListAPIView):
@ -1041,7 +1055,6 @@ class UserProjectsList(SubListAPIView):
model = Project
serializer_class = ProjectSerializer
parent_model = User
relationship = 'projects'
def get_queryset(self):
parent = self.get_parent_object()
@ -1050,13 +1063,19 @@ class UserProjectsList(SubListAPIView):
user_qs = Project.accessible_objects(parent, {'read': True})
return my_qs & user_qs
class UserCredentialsList(SubListCreateAttachDetachAPIView):
class UserCredentialsList(SubListAPIView):
model = Credential
serializer_class = CredentialSerializer
parent_model = User
relationship = 'credentials'
parent_key = 'user'
def get_queryset(self):
user = self.get_parent_object()
self.check_parent_access(user)
visible_creds = Credential.accessible_objects(self.request.user, {'read': True})
user_creds = Credential.accessible_objects(user, {'read': True})
return user_creds & visible_creds
class UserOrganizationsList(SubListAPIView):

View File

@ -32,6 +32,7 @@ from awx.main.models.inventory import (
from awx.main.models.organization import (
Organization,
Permission,
Team,
)
from awx.main.models.rbac import Role
@ -102,6 +103,33 @@ def project(instance, organization):
)
return prj
@pytest.fixture
def project_factory(organization):
def factory(name):
try:
prj = Project.objects.get(name=name)
except Project.DoesNotExist:
prj = Project.objects.create(name=name,
description="description for " + name,
scm_type="git",
scm_url="https://github.com/jlaska/ansible-playbooks",
organization=organization
)
return prj
return factory
@pytest.fixture
def team_factory(organization):
def factory(name):
try:
t = Team.objects.get(name=name)
except Team.DoesNotExist:
t = Team.objects.create(name=name,
description="description for " + name,
organization=organization)
return t
return factory
@pytest.fixture
def user_project(user):
owner = user('owner')

View File

@ -0,0 +1,97 @@
import mock # noqa
import pytest
from django.core.urlresolvers import reverse
#
# Project listing and visibility tests
#
@pytest.mark.django_db
def test_user_project_list(get, project_factory, admin, alice, bob):
'List of projects a user has access to, filtered by projects you can also see'
alice_project = project_factory('alice project')
alice_project.admin_role.members.add(alice)
bob_project = project_factory('bob project')
bob_project.admin_role.members.add(bob)
shared_project = project_factory('shared project')
shared_project.admin_role.members.add(alice)
shared_project.admin_role.members.add(bob)
# admins can see all projects
assert get(reverse('api:user_projects_list', args=(admin.pk,)), admin).data['count'] == 3
# admins can see everyones projects
assert get(reverse('api:user_projects_list', args=(alice.pk,)), admin).data['count'] == 2
assert get(reverse('api:user_projects_list', args=(bob.pk,)), admin).data['count'] == 2
# users can see their own projects
assert get(reverse('api:user_projects_list', args=(alice.pk,)), alice).data['count'] == 2
# alice should only be able to see the shared project when looking at bobs projects
assert get(reverse('api:user_projects_list', args=(bob.pk,)), alice).data['count'] == 1
# alice should see all projects they can see when viewing an admin
assert get(reverse('api:user_projects_list', args=(admin.pk,)), alice).data['count'] == 2
@pytest.mark.django_db
def test_team_project_list(get, project_factory, team_factory, admin, alice, bob):
'List of projects a team has access to, filtered by projects you can also see'
team1 = team_factory('team1')
team2 = team_factory('team2')
team1_project = project_factory('team1 project')
team1_project.admin_role.parents.add(team1.member_role)
team2_project = project_factory('team2 project')
team2_project.admin_role.parents.add(team2.member_role)
shared_project = project_factory('shared project')
shared_project.admin_role.parents.add(team1.member_role)
shared_project.admin_role.parents.add(team2.member_role)
team1.member_role.members.add(alice)
team2.member_role.members.add(bob)
# admins can see all projects on a team
assert get(reverse('api:team_projects_list', args=(team1.pk,)), admin).data['count'] == 2
assert get(reverse('api:team_projects_list', args=(team2.pk,)), admin).data['count'] == 2
# users can see all projects on teams they are a member of
assert get(reverse('api:team_projects_list', args=(team1.pk,)), alice).data['count'] == 2
# alice should not be able to see team2 projects because she doesn't have access to team2
res = get(reverse('api:team_projects_list', args=(team2.pk,)), alice)
assert res.status_code == 403
# but if she does, then she should only see the shared project
team2.auditor_role.members.add(alice)
assert get(reverse('api:team_projects_list', args=(team2.pk,)), alice).data['count'] == 1
team2.auditor_role.members.remove(alice)
# Test user endpoints first, very similar tests to test_user_project_list
# but permissions are being derived from team membership instead.
# admins can see all projects
assert get(reverse('api:user_projects_list', args=(admin.pk,)), admin).data['count'] == 3
# admins can see everyones projects
assert get(reverse('api:user_projects_list', args=(alice.pk,)), admin).data['count'] == 2
assert get(reverse('api:user_projects_list', args=(bob.pk,)), admin).data['count'] == 2
# users can see their own projects
assert get(reverse('api:user_projects_list', args=(alice.pk,)), alice).data['count'] == 2
# alice should not be able to see bob
res = get(reverse('api:user_projects_list', args=(bob.pk,)), alice)
assert res.status_code == 403
# alice should see all projects they can see when viewing an admin
assert get(reverse('api:user_projects_list', args=(admin.pk,)), alice).data['count'] == 2

View File

@ -468,309 +468,7 @@ class ProjectsTest(BaseTransactionTest):
got = self.get(url, expect=401)
got = self.get(url, expect=200, auth=self.get_super_credentials())
# =====================================================================
# CREDENTIALS
other_creds = reverse('api:user_credentials_list', args=(other.pk,))
team_creds = reverse('api:team_credentials_list', args=(team.pk,))
new_credentials = dict(
name = 'credential',
project = Project.objects.order_by('pk')[0].pk,
default_username = 'foo',
ssh_key_data = TEST_SSH_KEY_DATA_LOCKED,
ssh_key_unlock = TEST_SSH_KEY_DATA_UNLOCK,
ssh_password = 'narf',
sudo_password = 'troz',
security_token = '',
vault_password = None,
)
# can add credentials to a user (if user or org admin or super user)
self.post(other_creds, data=new_credentials, expect=401)
self.post(other_creds, data=new_credentials, expect=401, auth=self.get_invalid_credentials())
new_credentials['team'] = team.pk
result = self.post(other_creds, data=new_credentials, expect=201, auth=self.get_super_credentials())
cred_user = result['id']
self.assertEqual(result['team'], None)
del new_credentials['team']
new_credentials['name'] = 'credential2'
self.post(other_creds, data=new_credentials, expect=201, auth=self.get_normal_credentials())
new_credentials['name'] = 'credential3'
result = self.post(other_creds, data=new_credentials, expect=201, auth=self.get_other_credentials())
new_credentials['name'] = 'credential4'
self.post(other_creds, data=new_credentials, expect=403, auth=self.get_nobody_credentials())
# can add credentials to a team
new_credentials['name'] = 'credential'
new_credentials['user'] = other.pk
self.post(team_creds, data=new_credentials, expect=401)
self.post(team_creds, data=new_credentials, expect=401, auth=self.get_invalid_credentials())
result = self.post(team_creds, data=new_credentials, expect=201, auth=self.get_super_credentials())
self.assertEqual(result['user'], None)
del new_credentials['user']
new_credentials['name'] = 'credential2'
result = self.post(team_creds, data=new_credentials, expect=201, auth=self.get_normal_credentials())
new_credentials['name'] = 'credential3'
self.post(team_creds, data=new_credentials, expect=403, auth=self.get_other_credentials())
self.post(team_creds, data=new_credentials, expect=403, auth=self.get_nobody_credentials())
cred_team = result['id']
# can list credentials on a user
self.get(other_creds, expect=401)
self.get(other_creds, expect=401, auth=self.get_invalid_credentials())
self.get(other_creds, expect=200, auth=self.get_super_credentials())
self.get(other_creds, expect=200, auth=self.get_normal_credentials())
self.get(other_creds, expect=200, auth=self.get_other_credentials())
self.get(other_creds, expect=403, auth=self.get_nobody_credentials())
# can list credentials on a team
self.get(team_creds, expect=401)
self.get(team_creds, expect=401, auth=self.get_invalid_credentials())
self.get(team_creds, expect=200, auth=self.get_super_credentials())
self.get(team_creds, expect=200, auth=self.get_normal_credentials())
self.get(team_creds, expect=403, auth=self.get_other_credentials())
self.get(team_creds, expect=403, auth=self.get_nobody_credentials())
# Check /api/v1/credentials (GET)
url = reverse('api:credential_list')
with self.current_user(self.super_django_user):
self.options(url)
self.head(url)
response = self.get(url)
qs = Credential.objects.all()
self.check_pagination_and_size(response, qs.count())
self.check_list_ids(response, qs)
# POST should now work for all users.
with self.current_user(self.super_django_user):
data = dict(name='xyz', user=self.super_django_user.pk)
self.post(url, data, expect=201)
# Repeating the same POST should violate a unique constraint.
with self.current_user(self.super_django_user):
data = dict(name='xyz', user=self.super_django_user.pk)
response = self.post(url, data, expect=400)
self.assertTrue('__all__' in response, response)
self.assertTrue('already exists' in response['__all__'][0], response)
# Test with null where we expect a string value. Value will be coerced
# to an empty string.
with self.current_user(self.super_django_user):
data = dict(name='zyx', user=self.super_django_user.pk, kind='ssh',
become_username=None)
response = self.post(url, data, expect=201)
self.assertEqual(response['become_username'], '')
# Test with encrypted ssh key and no unlock password.
with self.current_user(self.super_django_user):
data = dict(name='wxy', user=self.super_django_user.pk, kind='ssh',
ssh_key_data=TEST_SSH_KEY_DATA_LOCKED)
self.post(url, data, expect=400)
data['ssh_key_unlock'] = TEST_SSH_KEY_DATA_UNLOCK
self.post(url, data, expect=201)
# Test with invalid ssh key data.
with self.current_user(self.super_django_user):
bad_key_data = TEST_SSH_KEY_DATA.replace('PRIVATE', 'PUBLIC')
data = dict(name='wyx', user=self.super_django_user.pk, kind='ssh',
ssh_key_data=bad_key_data)
self.post(url, data, expect=400)
data['ssh_key_data'] = TEST_SSH_KEY_DATA.replace('-', '=')
self.post(url, data, expect=400)
data['ssh_key_data'] = '\n'.join(TEST_SSH_KEY_DATA.splitlines()[1:-1])
self.post(url, data, expect=400)
data['ssh_key_data'] = TEST_SSH_KEY_DATA.replace('--B', '---B')
self.post(url, data, expect=400)
data['ssh_key_data'] = TEST_SSH_KEY_DATA
self.post(url, data, expect=201)
# Test with OpenSSH format private key.
with self.current_user(self.super_django_user):
data = dict(name='openssh-unlocked', user=self.super_django_user.pk, kind='ssh',
ssh_key_data=TEST_OPENSSH_KEY_DATA)
self.post(url, data, expect=201)
# Test with OpenSSH format private key that requires passphrase.
with self.current_user(self.super_django_user):
data = dict(name='openssh-locked', user=self.super_django_user.pk, kind='ssh',
ssh_key_data=TEST_OPENSSH_KEY_DATA_LOCKED)
self.post(url, data, expect=400)
data['ssh_key_unlock'] = TEST_SSH_KEY_DATA_UNLOCK
self.post(url, data, expect=201)
# Test post as organization admin where team is part of org, but user
# creating credential is not a member of the team. UI may pass user
# as an empty string instead of None.
normal_org = self.organizations[1] # normal user is an admin of this
org_team = normal_org.teams.create(name='new empty team')
with self.current_user(self.normal_django_user):
data = {
'name': 'my team cred',
'team': org_team.pk,
'user': '',
}
self.post(url, data, expect=201)
# FIXME: Check list as other users.
# can edit a credential
cred_user = Credential.objects.get(pk=cred_user)
cred_team = Credential.objects.get(pk=cred_team)
d_cred_user = dict(id=cred_user.pk, name='x', sudo_password='blippy', user=cred_user.user.pk)
d_cred_user2 = dict(id=cred_user.pk, name='x', sudo_password='blippy', user=self.super_django_user.pk)
d_cred_team = dict(id=cred_team.pk, name='x', sudo_password='blippy', team=cred_team.team.pk)
edit_creds1 = reverse('api:credential_detail', args=(cred_user.pk,))
edit_creds2 = reverse('api:credential_detail', args=(cred_team.pk,))
self.put(edit_creds1, data=d_cred_user, expect=401)
self.put(edit_creds1, data=d_cred_user, expect=401, auth=self.get_invalid_credentials())
self.put(edit_creds1, data=d_cred_user, expect=200, auth=self.get_super_credentials())
self.put(edit_creds1, data=d_cred_user, expect=200, auth=self.get_normal_credentials())
# We now allow credential to be reassigned (with the right permissions).
cred_put_u = self.put(edit_creds1, data=d_cred_user2, expect=200, auth=self.get_normal_credentials())
self.put(edit_creds1, data=d_cred_user, expect=403, auth=self.get_other_credentials())
self.put(edit_creds2, data=d_cred_team, expect=401)
self.put(edit_creds2, data=d_cred_team, expect=401, auth=self.get_invalid_credentials())
self.put(edit_creds2, data=d_cred_team, expect=200, auth=self.get_super_credentials())
cred_put_t = self.put(edit_creds2, data=d_cred_team, expect=200, auth=self.get_normal_credentials())
self.put(edit_creds2, data=d_cred_team, expect=403, auth=self.get_other_credentials())
# Reassign credential between team and user.
with self.current_user(self.super_django_user):
self.post(team_creds, data=dict(id=cred_user.pk), expect=204)
response = self.get(edit_creds1)
self.assertEqual(response['team'], team.pk)
self.assertEqual(response['user'], None)
self.post(other_creds, data=dict(id=cred_user.pk), expect=204)
response = self.get(edit_creds1)
self.assertEqual(response['team'], None)
self.assertEqual(response['user'], other.pk)
self.post(other_creds, data=dict(id=cred_team.pk), expect=204)
response = self.get(edit_creds2)
self.assertEqual(response['team'], None)
self.assertEqual(response['user'], other.pk)
self.post(team_creds, data=dict(id=cred_team.pk), expect=204)
response = self.get(edit_creds2)
self.assertEqual(response['team'], team.pk)
self.assertEqual(response['user'], None)
cred_put_t['disassociate'] = 1
team_url = reverse('api:team_credentials_list', args=(cred_put_t['team'],))
self.post(team_url, data=cred_put_t, expect=204, auth=self.get_normal_credentials())
# can remove credentials from a user (via disassociate) - this will delete the credential.
cred_put_u['disassociate'] = 1
url = cred_put_u['url']
user_url = reverse('api:user_credentials_list', args=(cred_put_u['user'],))
self.post(user_url, data=cred_put_u, expect=204, auth=self.get_normal_credentials())
# can delete a credential directly -- probably won't be used too often
#data = self.delete(url, expect=204, auth=self.get_other_credentials())
data = self.delete(url, expect=404, auth=self.get_other_credentials())
# =====================================================================
# PERMISSIONS
user = self.other_django_user
team = Team.objects.order_by('pk')[0]
organization = Organization.objects.order_by('pk')[0]
inventory = Inventory.objects.create(
name = 'test inventory',
organization = organization,
created_by = self.super_django_user
)
project = Project.objects.order_by('pk')[0]
# can add permissions to a user
user_permission = dict(
name='user can deploy a certain project to a certain inventory',
# user=user.pk, # no need to specify, this will be automatically filled in
inventory=inventory.pk,
project=project.pk,
permission_type=PERM_INVENTORY_DEPLOY,
run_ad_hoc_commands=None,
)
team_permission = dict(
name='team can deploy a certain project to a certain inventory',
# team=team.pk, # no need to specify, this will be automatically filled in
inventory=inventory.pk,
project=project.pk,
permission_type=PERM_INVENTORY_DEPLOY,
)
url = reverse('api:user_permissions_list', args=(user.pk,))
posted = self.post(url, user_permission, expect=201, auth=self.get_super_credentials())
url2 = posted['url']
user_perm_detail = posted['url']
got = self.get(url2, expect=200, auth=self.get_other_credentials())
# cannot add permissions that apply to both team and user
url = reverse('api:user_permissions_list', args=(user.pk,))
user_permission['name'] = 'user permission 2'
user_permission['team'] = team.pk
self.post(url, user_permission, expect=400, auth=self.get_super_credentials())
# cannot set admin/read/write permissions when a project is involved.
user_permission.pop('team')
user_permission['name'] = 'user permission 3'
user_permission['permission_type'] = PERM_INVENTORY_ADMIN
self.post(url, user_permission, expect=400, auth=self.get_super_credentials())
# project is required for a deployment permission
user_permission['name'] = 'user permission 4'
user_permission['permission_type'] = PERM_INVENTORY_DEPLOY
user_permission.pop('project')
self.post(url, user_permission, expect=400, auth=self.get_super_credentials())
# can add permissions on a team
url = reverse('api:team_permissions_list', args=(team.pk,))
posted = self.post(url, team_permission, expect=201, auth=self.get_super_credentials())
url2 = posted['url']
# check we can get that permission back
got = self.get(url2, expect=200, auth=self.get_other_credentials())
# cannot add permissions that apply to both team and user
url = reverse('api:team_permissions_list', args=(team.pk,))
team_permission['name'] += '2'
team_permission['user'] = user.pk
self.post(url, team_permission, expect=400, auth=self.get_super_credentials())
del team_permission['user']
# can list permissions on a user
url = reverse('api:user_permissions_list', args=(user.pk,))
got = self.get(url, expect=200, auth=self.get_super_credentials())
got = self.get(url, expect=200, auth=self.get_other_credentials())
got = self.get(url, expect=403, auth=self.get_nobody_credentials())
# can list permissions on a team
url = reverse('api:team_permissions_list', args=(team.pk,))
got = self.get(url, expect=200, auth=self.get_super_credentials())
got = self.get(url, expect=200, auth=self.get_other_credentials())
got = self.get(url, expect=403, auth=self.get_nobody_credentials())
# can edit a permission -- reducing the permission level
team_permission['permission_type'] = PERM_INVENTORY_CHECK
self.put(url2, team_permission, expect=200, auth=self.get_super_credentials())
self.put(url2, team_permission, expect=403, auth=self.get_other_credentials())
# can remove permissions
# do need to disassociate, just delete it
self.delete(url2, expect=403, auth=self.get_other_credentials())
self.delete(url2, expect=204, auth=self.get_super_credentials())
self.delete(user_perm_detail, expect=204, auth=self.get_super_credentials())
self.delete(url2, expect=404, auth=self.get_other_credentials())
# User is still a team member
self.get(reverse('api:project_detail', args=(project.pk,)), expect=200, auth=self.get_other_credentials())
team.member_role.members.remove(self.other_django_user)
# User is no longer a team member and has no permissions
self.get(reverse('api:project_detail', args=(project.pk,)), expect=403, auth=self.get_other_credentials())
@override_settings(CELERY_ALWAYS_EAGER=True,
CELERY_EAGER_PROPAGATES_EXCEPTIONS=True,