mirror of
https://github.com/ansible/awx.git
synced 2024-11-02 18:21:12 +03:00
commit
e237648f4c
@ -1239,15 +1239,15 @@ class CredentialList(ListCreateAPIView):
|
|||||||
|
|
||||||
if 'user' in request.data:
|
if 'user' in request.data:
|
||||||
user = User.objects.get(pk=request.data['user'])
|
user = User.objects.get(pk=request.data['user'])
|
||||||
obj = user
|
can_add_params = {'user': user.id}
|
||||||
if 'team' in request.data:
|
if 'team' in request.data:
|
||||||
team = Team.objects.get(pk=request.data['team'])
|
team = Team.objects.get(pk=request.data['team'])
|
||||||
obj = team
|
can_add_params = {'team': team.id}
|
||||||
if 'organization' in request.data:
|
if 'organization' in request.data:
|
||||||
organization = Organization.objects.get(pk=request.data['organization'])
|
organization = Organization.objects.get(pk=request.data['organization'])
|
||||||
obj = organization
|
can_add_params = {'organization': organization.id}
|
||||||
|
|
||||||
if not self.request.user.can_access(type(obj), 'change', obj, request.data):
|
if not self.request.user.can_access(Credential, 'add', can_add_params):
|
||||||
raise PermissionDenied()
|
raise PermissionDenied()
|
||||||
|
|
||||||
ret = super(CredentialList, self).post(request, *args, **kwargs)
|
ret = super(CredentialList, self).post(request, *args, **kwargs)
|
||||||
@ -1277,8 +1277,7 @@ class UserCredentialsList(CredentialList):
|
|||||||
return user_creds & visible_creds
|
return user_creds & visible_creds
|
||||||
|
|
||||||
def post(self, request, *args, **kwargs):
|
def post(self, request, *args, **kwargs):
|
||||||
user = User.objects.get(pk=self.kwargs['pk'])
|
request.data['user'] = self.kwargs['pk']
|
||||||
request.data['user'] = user.id
|
|
||||||
# The following post takes care of ensuring the current user can add a cred to this user
|
# The following post takes care of ensuring the current user can add a cred to this user
|
||||||
return super(UserCredentialsList, self).post(request, args, kwargs)
|
return super(UserCredentialsList, self).post(request, args, kwargs)
|
||||||
|
|
||||||
@ -1297,8 +1296,7 @@ class TeamCredentialsList(CredentialList):
|
|||||||
return team_creds & visible_creds
|
return team_creds & visible_creds
|
||||||
|
|
||||||
def post(self, request, *args, **kwargs):
|
def post(self, request, *args, **kwargs):
|
||||||
team = Team.objects.get(pk=self.kwargs['pk'])
|
request.data['team'] = self.kwargs['pk']
|
||||||
request.data['team'] = team.id
|
|
||||||
# The following post takes care of ensuring the current user can add a cred to this user
|
# The following post takes care of ensuring the current user can add a cred to this user
|
||||||
return super(TeamCredentialsList, self).post(request, args, kwargs)
|
return super(TeamCredentialsList, self).post(request, args, kwargs)
|
||||||
|
|
||||||
|
@ -349,7 +349,7 @@ class InventoryAccess(BaseAccess):
|
|||||||
if self.user not in org.admin_role:
|
if self.user not in org.admin_role:
|
||||||
return False
|
return False
|
||||||
# Otherwise, just check for write permission.
|
# Otherwise, just check for write permission.
|
||||||
return self.user in obj.admin_role
|
return self.user in obj.update_role
|
||||||
|
|
||||||
@check_superuser
|
@check_superuser
|
||||||
def can_admin(self, obj, data):
|
def can_admin(self, obj, data):
|
||||||
@ -401,7 +401,7 @@ class HostAccess(BaseAccess):
|
|||||||
# Checks for admin or change permission on inventory.
|
# Checks for admin or change permission on inventory.
|
||||||
inventory_pk = get_pk_from_dict(data, 'inventory')
|
inventory_pk = get_pk_from_dict(data, 'inventory')
|
||||||
inventory = get_object_or_400(Inventory, pk=inventory_pk)
|
inventory = get_object_or_400(Inventory, pk=inventory_pk)
|
||||||
if self.user not in inventory.admin_role:
|
if self.user not in inventory.update_role:
|
||||||
return False
|
return False
|
||||||
|
|
||||||
# Check to see if we have enough licenses
|
# Check to see if we have enough licenses
|
||||||
@ -415,7 +415,7 @@ class HostAccess(BaseAccess):
|
|||||||
raise PermissionDenied('Unable to change inventory on a host')
|
raise PermissionDenied('Unable to change inventory on a host')
|
||||||
# Checks for admin or change permission on inventory, controls whether
|
# Checks for admin or change permission on inventory, controls whether
|
||||||
# the user can edit variable data.
|
# the user can edit variable data.
|
||||||
return obj and self.user in obj.inventory.admin_role
|
return obj and self.user in obj.inventory.update_role
|
||||||
|
|
||||||
def can_attach(self, obj, sub_obj, relationship, data,
|
def can_attach(self, obj, sub_obj, relationship, data,
|
||||||
skip_sub_obj_read_check=False):
|
skip_sub_obj_read_check=False):
|
||||||
@ -452,7 +452,7 @@ class GroupAccess(BaseAccess):
|
|||||||
# Checks for admin or change permission on inventory.
|
# Checks for admin or change permission on inventory.
|
||||||
inventory_pk = get_pk_from_dict(data, 'inventory')
|
inventory_pk = get_pk_from_dict(data, 'inventory')
|
||||||
inventory = get_object_or_400(Inventory, pk=inventory_pk)
|
inventory = get_object_or_400(Inventory, pk=inventory_pk)
|
||||||
return self.user in inventory.admin_role
|
return self.user in inventory.update_role
|
||||||
|
|
||||||
def can_change(self, obj, data):
|
def can_change(self, obj, data):
|
||||||
# Prevent moving a group to a different inventory.
|
# Prevent moving a group to a different inventory.
|
||||||
@ -461,7 +461,7 @@ class GroupAccess(BaseAccess):
|
|||||||
raise PermissionDenied('Unable to change inventory on a group')
|
raise PermissionDenied('Unable to change inventory on a group')
|
||||||
# Checks for admin or change permission on inventory, controls whether
|
# Checks for admin or change permission on inventory, controls whether
|
||||||
# the user can attach subgroups or edit variable data.
|
# the user can attach subgroups or edit variable data.
|
||||||
return obj and self.user in obj.inventory.admin_role
|
return obj and self.user in obj.inventory.update_role
|
||||||
|
|
||||||
def can_attach(self, obj, sub_obj, relationship, data,
|
def can_attach(self, obj, sub_obj, relationship, data,
|
||||||
skip_sub_obj_read_check=False):
|
skip_sub_obj_read_check=False):
|
||||||
@ -514,7 +514,7 @@ class InventorySourceAccess(BaseAccess):
|
|||||||
def can_change(self, obj, data):
|
def can_change(self, obj, data):
|
||||||
# Checks for admin or change permission on group.
|
# Checks for admin or change permission on group.
|
||||||
if obj and obj.group:
|
if obj and obj.group:
|
||||||
return self.user in obj.group.admin_role
|
return self.user in obj.group.update_role
|
||||||
# Can't change inventory sources attached to only the inventory, since
|
# Can't change inventory sources attached to only the inventory, since
|
||||||
# these are created automatically from the management command.
|
# these are created automatically from the management command.
|
||||||
else:
|
else:
|
||||||
@ -572,8 +572,22 @@ class CredentialAccess(BaseAccess):
|
|||||||
return self.user in obj.read_role
|
return self.user in obj.read_role
|
||||||
|
|
||||||
def can_add(self, data):
|
def can_add(self, data):
|
||||||
# Access enforced in our view where we have context enough to make a decision
|
if self.user.is_superuser:
|
||||||
return True
|
return True
|
||||||
|
user_pk = get_pk_from_dict(data, 'user')
|
||||||
|
if user_pk:
|
||||||
|
user_obj = get_object_or_400(User, pk=user_pk)
|
||||||
|
return check_user_access(self.user, User, 'change', user_obj, None)
|
||||||
|
team_pk = get_pk_from_dict(data, 'team')
|
||||||
|
if team_pk:
|
||||||
|
team_obj = get_object_or_400(Team, pk=team_pk)
|
||||||
|
return check_user_access(self.user, Team, 'change', team_obj, None)
|
||||||
|
organization_pk = get_pk_from_dict(data, 'organization')
|
||||||
|
if organization_pk:
|
||||||
|
organization_obj = get_object_or_400(Organization, pk=organization_pk)
|
||||||
|
return check_user_access(self.user, Organization, 'change', organization_obj, None)
|
||||||
|
return False
|
||||||
|
|
||||||
|
|
||||||
@check_superuser
|
@check_superuser
|
||||||
def can_use(self, obj):
|
def can_use(self, obj):
|
||||||
@ -581,6 +595,8 @@ class CredentialAccess(BaseAccess):
|
|||||||
|
|
||||||
@check_superuser
|
@check_superuser
|
||||||
def can_change(self, obj, data):
|
def can_change(self, obj, data):
|
||||||
|
if not self.can_add(data):
|
||||||
|
return False
|
||||||
return self.user in obj.owner_role
|
return self.user in obj.owner_role
|
||||||
|
|
||||||
def can_delete(self, obj):
|
def can_delete(self, obj):
|
||||||
@ -615,12 +631,13 @@ class TeamAccess(BaseAccess):
|
|||||||
return True
|
return True
|
||||||
return False
|
return False
|
||||||
|
|
||||||
@check_superuser
|
|
||||||
def can_change(self, obj, data):
|
def can_change(self, obj, data):
|
||||||
# Prevent moving a team to a different organization.
|
# Prevent moving a team to a different organization.
|
||||||
org_pk = get_pk_from_dict(data, 'organization')
|
org_pk = get_pk_from_dict(data, 'organization')
|
||||||
if obj and org_pk and obj.organization.pk != org_pk:
|
if obj and org_pk and obj.organization.pk != org_pk:
|
||||||
raise PermissionDenied('Unable to change organization on a team')
|
raise PermissionDenied('Unable to change organization on a team')
|
||||||
|
if self.user.is_superuser:
|
||||||
|
return True
|
||||||
return self.user in obj.admin_role
|
return self.user in obj.admin_role
|
||||||
|
|
||||||
def can_delete(self, obj):
|
def can_delete(self, obj):
|
||||||
@ -662,7 +679,6 @@ class ProjectAccess(BaseAccess):
|
|||||||
def can_delete(self, obj):
|
def can_delete(self, obj):
|
||||||
return self.can_change(obj, None)
|
return self.can_change(obj, None)
|
||||||
|
|
||||||
@check_superuser
|
|
||||||
def can_start(self, obj):
|
def can_start(self, obj):
|
||||||
return self.can_change(obj, {}) and obj.can_update
|
return self.can_change(obj, {}) and obj.can_update
|
||||||
|
|
||||||
@ -715,8 +731,7 @@ class JobTemplateAccess(BaseAccess):
|
|||||||
'credential', 'cloud_credential', 'next_schedule').all()
|
'credential', 'cloud_credential', 'next_schedule').all()
|
||||||
|
|
||||||
def can_read(self, obj):
|
def can_read(self, obj):
|
||||||
# you can only see the job templates that you have permission to launch.
|
return self.user in obj.read_role
|
||||||
return self.can_start(obj, validate_license=False)
|
|
||||||
|
|
||||||
def can_add(self, data):
|
def can_add(self, data):
|
||||||
'''
|
'''
|
||||||
|
@ -21,5 +21,6 @@ class Migration(migrations.Migration):
|
|||||||
migrations.RunPython(rbac.migrate_inventory),
|
migrations.RunPython(rbac.migrate_inventory),
|
||||||
migrations.RunPython(rbac.migrate_projects),
|
migrations.RunPython(rbac.migrate_projects),
|
||||||
migrations.RunPython(rbac.migrate_credential),
|
migrations.RunPython(rbac.migrate_credential),
|
||||||
|
migrations.RunPython(rbac.migrate_job_templates),
|
||||||
migrations.RunPython(rbac.rebuild_role_hierarchy),
|
migrations.RunPython(rbac.rebuild_role_hierarchy),
|
||||||
]
|
]
|
||||||
|
@ -206,9 +206,9 @@ class UserAccess(BaseAccess):
|
|||||||
return qs
|
return qs
|
||||||
return qs.filter(
|
return qs.filter(
|
||||||
Q(pk=self.user.pk) |
|
Q(pk=self.user.pk) |
|
||||||
Q(organizations__in=self.user.deprecated_admin_of_organizations) |
|
Q(deprecated_organizations__in=self.user.deprecated_admin_of_organizations.all()) |
|
||||||
Q(organizations__in=self.user.deprecated_organizations) |
|
Q(deprecated_organizations__in=self.user.deprecated_organizations.all()) |
|
||||||
Q(deprecated_teams__in=self.user.deprecated_teams)
|
Q(deprecated_teams__in=self.user.deprecated_teams.all())
|
||||||
).distinct()
|
).distinct()
|
||||||
|
|
||||||
def can_add(self, data):
|
def can_add(self, data):
|
||||||
@ -563,18 +563,18 @@ class CredentialAccess(BaseAccess):
|
|||||||
# If the user is a superuser, and therefore can see everything, this
|
# If the user is a superuser, and therefore can see everything, this
|
||||||
# is also sufficient, and we are done.
|
# is also sufficient, and we are done.
|
||||||
qs = self.model.objects.distinct()
|
qs = self.model.objects.distinct()
|
||||||
qs = qs.select_related('created_by', 'modified_by', 'user', 'team')
|
qs = qs.select_related('created_by', 'modified_by')
|
||||||
if self.user.is_superuser:
|
if self.user.is_superuser:
|
||||||
return qs
|
return qs
|
||||||
|
|
||||||
# Get the list of organizations for which the user is an admin
|
# Get the list of organizations for which the user is an admin
|
||||||
orgs_as_admin_ids = set(self.user.deprecated_admin_of_organizations.values_list('id', flat=True))
|
orgs_as_admin_ids = set(self.user.deprecated_admin_of_organizations.values_list('id', flat=True))
|
||||||
return qs.filter(
|
return qs.filter(
|
||||||
Q(user=self.user) |
|
Q(deprecated_user=self.user) |
|
||||||
Q(user__deprecated_organizations__id__in=orgs_as_admin_ids) |
|
Q(deprecated_user__deprecated_organizations__id__in=orgs_as_admin_ids) |
|
||||||
Q(user__deprecated_admin_of_organizations__id__in=orgs_as_admin_ids) |
|
Q(deprecated_user__deprecated_admin_of_organizations__id__in=orgs_as_admin_ids) |
|
||||||
Q(team__organization__id__in=orgs_as_admin_ids) |
|
Q(deprecated_team__organization__id__in=orgs_as_admin_ids) |
|
||||||
Q(team__deprecated_users__in=[self.user])
|
Q(deprecated_team__deprecated_users__in=[self.user])
|
||||||
)
|
)
|
||||||
|
|
||||||
def can_add(self, data):
|
def can_add(self, data):
|
||||||
@ -597,22 +597,22 @@ class CredentialAccess(BaseAccess):
|
|||||||
return False
|
return False
|
||||||
if self.user == obj.created_by:
|
if self.user == obj.created_by:
|
||||||
return True
|
return True
|
||||||
if obj.user:
|
if obj.deprecated_user:
|
||||||
if self.user == obj.user:
|
if self.user == obj.deprecated_user:
|
||||||
return True
|
return True
|
||||||
if obj.user.deprecated_organizations.filter(deprecated_admins__in=[self.user]).exists():
|
if obj.deprecated_user.deprecated_organizations.filter(deprecated_admins__in=[self.user]).exists():
|
||||||
return True
|
return True
|
||||||
if obj.user.deprecated_admin_of_organizations.filter(deprecated_admins__in=[self.user]).exists():
|
if obj.deprecated_user.deprecated_admin_of_organizations.filter(deprecated_admins__in=[self.user]).exists():
|
||||||
return True
|
return True
|
||||||
if obj.team:
|
if obj.deprecated_team:
|
||||||
if self.user in obj.team.organization.deprecated_admins.all():
|
if self.user in obj.deprecated_team.organization.deprecated_admins.all():
|
||||||
return True
|
return True
|
||||||
return False
|
return False
|
||||||
|
|
||||||
def can_delete(self, obj):
|
def can_delete(self, obj):
|
||||||
# Unassociated credentials may be marked deleted by anyone, though we
|
# Unassociated credentials may be marked deleted by anyone, though we
|
||||||
# shouldn't ever end up with those.
|
# shouldn't ever end up with those.
|
||||||
if obj.user is None and obj.team is None:
|
if obj.deprecated_user is None and obj.deprecated_team is None:
|
||||||
return True
|
return True
|
||||||
return self.can_change(obj, None)
|
return self.can_change(obj, None)
|
||||||
|
|
||||||
|
@ -125,8 +125,6 @@ def attrfunc(attr_path):
|
|||||||
|
|
||||||
def _update_credential_parents(org, cred):
|
def _update_credential_parents(org, cred):
|
||||||
org.admin_role.children.add(cred.owner_role)
|
org.admin_role.children.add(cred.owner_role)
|
||||||
org.member_role.children.add(cred.use_role)
|
|
||||||
cred.deprecated_user, cred.deprecated_team = None, None
|
|
||||||
cred.save()
|
cred.save()
|
||||||
|
|
||||||
def _discover_credentials(instances, cred, orgfunc):
|
def _discover_credentials(instances, cred, orgfunc):
|
||||||
@ -158,7 +156,6 @@ def _discover_credentials(instances, cred, orgfunc):
|
|||||||
cred.save()
|
cred.save()
|
||||||
|
|
||||||
# Unlink the old information from the new credential
|
# Unlink the old information from the new credential
|
||||||
cred.deprecated_user, cred.deprecated_team = None, None
|
|
||||||
cred.owner_role, cred.use_role = None, None
|
cred.owner_role, cred.use_role = None, None
|
||||||
cred.save()
|
cred.save()
|
||||||
|
|
||||||
@ -172,42 +169,32 @@ def migrate_credential(apps, schema_editor):
|
|||||||
Credential = apps.get_model('main', "Credential")
|
Credential = apps.get_model('main', "Credential")
|
||||||
JobTemplate = apps.get_model('main', 'JobTemplate')
|
JobTemplate = apps.get_model('main', 'JobTemplate')
|
||||||
Project = apps.get_model('main', 'Project')
|
Project = apps.get_model('main', 'Project')
|
||||||
Role = apps.get_model('main', 'Role')
|
|
||||||
User = apps.get_model('auth', 'User')
|
|
||||||
InventorySource = apps.get_model('main', 'InventorySource')
|
InventorySource = apps.get_model('main', 'InventorySource')
|
||||||
ContentType = apps.get_model('contenttypes', "ContentType")
|
|
||||||
user_content_type = ContentType.objects.get_for_model(User)
|
|
||||||
|
|
||||||
for cred in Credential.objects.iterator():
|
for cred in Credential.objects.iterator():
|
||||||
results = (JobTemplate.objects.filter(Q(credential=cred) | Q(cloud_credential=cred)).all() or
|
results = [x for x in JobTemplate.objects.filter(Q(credential=cred) | Q(cloud_credential=cred)).all()] + \
|
||||||
InventorySource.objects.filter(credential=cred).all())
|
[x for x in InventorySource.objects.filter(credential=cred).all()]
|
||||||
if results:
|
if cred.deprecated_team is not None and results:
|
||||||
if len(results) == 1:
|
if len(results) == 1:
|
||||||
_update_credential_parents(results[0].inventory.organization, cred)
|
_update_credential_parents(results[0].inventory.organization, cred)
|
||||||
else:
|
else:
|
||||||
_discover_credentials(results, cred, attrfunc('inventory.organization'))
|
_discover_credentials(results, cred, attrfunc('inventory.organization'))
|
||||||
logger.info(smart_text(u"added Credential(name={}, kind={}, host={}) at organization level".format(cred.name, cred.kind, cred.host)))
|
logger.info(smart_text(u"added Credential(name={}, kind={}, host={}) at organization level".format(cred.name, cred.kind, cred.host)))
|
||||||
continue
|
|
||||||
|
|
||||||
projs = Project.objects.filter(credential=cred).all()
|
projs = Project.objects.filter(credential=cred).all()
|
||||||
if projs:
|
if cred.deprecated_team is not None and projs:
|
||||||
if len(projs) == 1:
|
if len(projs) == 1:
|
||||||
_update_credential_parents(projs[0].organization, cred)
|
_update_credential_parents(projs[0].organization, cred)
|
||||||
else:
|
else:
|
||||||
_discover_credentials(projs, cred, attrfunc('organization'))
|
_discover_credentials(projs, cred, attrfunc('organization'))
|
||||||
logger.info(smart_text(u"added Credential(name={}, kind={}, host={}) at organization level".format(cred.name, cred.kind, cred.host)))
|
logger.info(smart_text(u"added Credential(name={}, kind={}, host={}) at organization level".format(cred.name, cred.kind, cred.host)))
|
||||||
continue
|
|
||||||
|
|
||||||
if cred.deprecated_team is not None:
|
if cred.deprecated_team is not None:
|
||||||
cred.deprecated_team.admin_role.children.add(cred.owner_role)
|
cred.deprecated_team.member_role.children.add(cred.owner_role)
|
||||||
cred.deprecated_team.member_role.children.add(cred.use_role)
|
|
||||||
cred.deprecated_user, cred.deprecated_team = None, None
|
|
||||||
cred.save()
|
cred.save()
|
||||||
logger.info(smart_text(u"added Credential(name={}, kind={}, host={}) at user level".format(cred.name, cred.kind, cred.host)))
|
logger.info(smart_text(u"added Credential(name={}, kind={}, host={}) at user level".format(cred.name, cred.kind, cred.host)))
|
||||||
elif cred.deprecated_user is not None:
|
elif cred.deprecated_user is not None:
|
||||||
user_admin_role = Role.objects.get(content_type=user_content_type, object_id=cred.deprecated_user.id)
|
cred.owner_role.members.add(cred.deprecated_user)
|
||||||
user_admin_role.children.add(cred.owner_role)
|
|
||||||
cred.deprecated_user, cred.deprecated_team = None, None
|
|
||||||
cred.save()
|
cred.save()
|
||||||
logger.info(smart_text(u"added Credential(name={}, kind={}, host={}) at user level".format(cred.name, cred.kind, cred.host, )))
|
logger.info(smart_text(u"added Credential(name={}, kind={}, host={}) at user level".format(cred.name, cred.kind, cred.host, )))
|
||||||
else:
|
else:
|
||||||
@ -408,7 +395,7 @@ def migrate_job_templates(apps, schema_editor):
|
|||||||
jt.execute_role.members.add(user)
|
jt.execute_role.members.add(user)
|
||||||
logger.info(smart_text(u'adding User({}) access to JobTemplate({})'.format(user.username, jt.name)))
|
logger.info(smart_text(u'adding User({}) access to JobTemplate({})'.format(user.username, jt.name)))
|
||||||
|
|
||||||
if user in jt.execute_role:
|
if jt.execute_role.ancestors.filter(members=user).exists(): # aka "user in jt.execute_role"
|
||||||
# If the job template is already accessible by the user, because they
|
# If the job template is already accessible by the user, because they
|
||||||
# are a sytem, organization, or project admin, then don't add an explicit
|
# are a sytem, organization, or project admin, then don't add an explicit
|
||||||
# role entry for them
|
# role entry for them
|
||||||
|
@ -24,6 +24,7 @@ def test_create_user_credential_via_credentials_list(post, get, alice):
|
|||||||
@pytest.mark.django_db
|
@pytest.mark.django_db
|
||||||
def test_create_user_credential_via_user_credentials_list(post, get, alice):
|
def test_create_user_credential_via_user_credentials_list(post, get, alice):
|
||||||
response = post(reverse('api:user_credentials_list', args=(alice.pk,)), {
|
response = post(reverse('api:user_credentials_list', args=(alice.pk,)), {
|
||||||
|
'user': alice.pk,
|
||||||
'name': 'Some name',
|
'name': 'Some name',
|
||||||
'username': 'someusername',
|
'username': 'someusername',
|
||||||
}, alice)
|
}, alice)
|
||||||
@ -45,6 +46,7 @@ def test_create_user_credential_via_credentials_list_xfail(post, alice, bob):
|
|||||||
@pytest.mark.django_db
|
@pytest.mark.django_db
|
||||||
def test_create_user_credential_via_user_credentials_list_xfail(post, alice, bob):
|
def test_create_user_credential_via_user_credentials_list_xfail(post, alice, bob):
|
||||||
response = post(reverse('api:user_credentials_list', args=(bob.pk,)), {
|
response = post(reverse('api:user_credentials_list', args=(bob.pk,)), {
|
||||||
|
'user': bob.pk,
|
||||||
'name': 'Some name',
|
'name': 'Some name',
|
||||||
'username': 'someusername'
|
'username': 'someusername'
|
||||||
}, alice)
|
}, alice)
|
||||||
@ -71,6 +73,7 @@ def test_create_team_credential(post, get, team, org_admin, team_member):
|
|||||||
@pytest.mark.django_db
|
@pytest.mark.django_db
|
||||||
def test_create_team_credential_via_team_credentials_list(post, get, team, org_admin, team_member):
|
def test_create_team_credential_via_team_credentials_list(post, get, team, org_admin, team_member):
|
||||||
response = post(reverse('api:team_credentials_list', args=(team.pk,)), {
|
response = post(reverse('api:team_credentials_list', args=(team.pk,)), {
|
||||||
|
'team': team.pk,
|
||||||
'name': 'Some name',
|
'name': 'Some name',
|
||||||
'username': 'someusername',
|
'username': 'someusername',
|
||||||
}, org_admin)
|
}, org_admin)
|
||||||
|
@ -27,7 +27,7 @@ def test_credential_use_role(credential, user, permissions):
|
|||||||
@pytest.mark.django_db
|
@pytest.mark.django_db
|
||||||
def test_credential_migration_team_member(credential, team, user, permissions):
|
def test_credential_migration_team_member(credential, team, user, permissions):
|
||||||
u = user('user', False)
|
u = user('user', False)
|
||||||
team.admin_role.members.add(u)
|
team.member_role.members.add(u)
|
||||||
credential.deprecated_team = team
|
credential.deprecated_team = team
|
||||||
credential.save()
|
credential.save()
|
||||||
|
|
||||||
@ -91,7 +91,8 @@ def test_credential_access_admin(user, team, credential):
|
|||||||
assert access.can_change(credential, {'user': u.pk})
|
assert access.can_change(credential, {'user': u.pk})
|
||||||
|
|
||||||
@pytest.mark.django_db
|
@pytest.mark.django_db
|
||||||
def test_cred_job_template(user, deploy_jobtemplate):
|
def test_cred_job_template_xfail(user, deploy_jobtemplate):
|
||||||
|
' Personal credential migration '
|
||||||
a = user('admin', False)
|
a = user('admin', False)
|
||||||
org = deploy_jobtemplate.project.organization
|
org = deploy_jobtemplate.project.organization
|
||||||
org.admin_role.members.add(a)
|
org.admin_role.members.add(a)
|
||||||
@ -102,19 +103,17 @@ def test_cred_job_template(user, deploy_jobtemplate):
|
|||||||
|
|
||||||
access = CredentialAccess(a)
|
access = CredentialAccess(a)
|
||||||
rbac.migrate_credential(apps, None)
|
rbac.migrate_credential(apps, None)
|
||||||
assert access.can_change(cred, {'organization': org.pk})
|
|
||||||
|
|
||||||
org.admin_role.members.remove(a)
|
|
||||||
assert not access.can_change(cred, {'organization': org.pk})
|
assert not access.can_change(cred, {'organization': org.pk})
|
||||||
|
|
||||||
@pytest.mark.django_db
|
@pytest.mark.django_db
|
||||||
def test_cred_multi_job_template_single_org(user, deploy_jobtemplate):
|
def test_cred_job_template(user, team, deploy_jobtemplate):
|
||||||
|
' Team credential migration => org credential '
|
||||||
a = user('admin', False)
|
a = user('admin', False)
|
||||||
org = deploy_jobtemplate.project.organization
|
org = deploy_jobtemplate.project.organization
|
||||||
org.admin_role.members.add(a)
|
org.admin_role.members.add(a)
|
||||||
|
|
||||||
cred = deploy_jobtemplate.credential
|
cred = deploy_jobtemplate.credential
|
||||||
cred.deprecated_user = user('john', False)
|
cred.deprecated_team = team
|
||||||
cred.save()
|
cred.save()
|
||||||
|
|
||||||
access = CredentialAccess(a)
|
access = CredentialAccess(a)
|
||||||
@ -125,8 +124,42 @@ def test_cred_multi_job_template_single_org(user, deploy_jobtemplate):
|
|||||||
assert not access.can_change(cred, {'organization': org.pk})
|
assert not access.can_change(cred, {'organization': org.pk})
|
||||||
|
|
||||||
@pytest.mark.django_db
|
@pytest.mark.django_db
|
||||||
def test_single_cred_multi_job_template_multi_org(user, organizations, credential):
|
def test_cred_multi_job_template_single_org_xfail(user, deploy_jobtemplate):
|
||||||
|
a = user('admin', False)
|
||||||
|
org = deploy_jobtemplate.project.organization
|
||||||
|
org.admin_role.members.add(a)
|
||||||
|
|
||||||
|
cred = deploy_jobtemplate.credential
|
||||||
|
cred.deprecated_user = user('john', False)
|
||||||
|
cred.save()
|
||||||
|
|
||||||
|
access = CredentialAccess(a)
|
||||||
|
rbac.migrate_credential(apps, None)
|
||||||
|
assert not access.can_change(cred, {'organization': org.pk})
|
||||||
|
|
||||||
|
@pytest.mark.django_db
|
||||||
|
def test_cred_multi_job_template_single_org(user, team, deploy_jobtemplate):
|
||||||
|
a = user('admin', False)
|
||||||
|
org = deploy_jobtemplate.project.organization
|
||||||
|
org.admin_role.members.add(a)
|
||||||
|
|
||||||
|
cred = deploy_jobtemplate.credential
|
||||||
|
cred.deprecated_team = team
|
||||||
|
cred.save()
|
||||||
|
|
||||||
|
access = CredentialAccess(a)
|
||||||
|
rbac.migrate_credential(apps, None)
|
||||||
|
assert access.can_change(cred, {'organization': org.pk})
|
||||||
|
|
||||||
|
org.admin_role.members.remove(a)
|
||||||
|
assert not access.can_change(cred, {'organization': org.pk})
|
||||||
|
|
||||||
|
@pytest.mark.django_db
|
||||||
|
def test_single_cred_multi_job_template_multi_org(user, organizations, credential, team):
|
||||||
orgs = organizations(2)
|
orgs = organizations(2)
|
||||||
|
credential.deprecated_team = team
|
||||||
|
credential.save()
|
||||||
|
|
||||||
jts = []
|
jts = []
|
||||||
for org in orgs:
|
for org in orgs:
|
||||||
inv = org.inventories.create(name="inv-%d" % org.pk)
|
inv = org.inventories.create(name="inv-%d" % org.pk)
|
||||||
@ -169,7 +202,7 @@ def test_cred_inventory_source(user, inventory, credential):
|
|||||||
assert u not in credential.use_role
|
assert u not in credential.use_role
|
||||||
|
|
||||||
rbac.migrate_credential(apps, None)
|
rbac.migrate_credential(apps, None)
|
||||||
assert u in credential.use_role
|
assert u not in credential.use_role
|
||||||
|
|
||||||
@pytest.mark.django_db
|
@pytest.mark.django_db
|
||||||
def test_cred_project(user, credential, project):
|
def test_cred_project(user, credential, project):
|
||||||
@ -181,7 +214,7 @@ def test_cred_project(user, credential, project):
|
|||||||
assert u not in credential.use_role
|
assert u not in credential.use_role
|
||||||
|
|
||||||
rbac.migrate_credential(apps, None)
|
rbac.migrate_credential(apps, None)
|
||||||
assert u in credential.use_role
|
assert u not in credential.use_role
|
||||||
|
|
||||||
@pytest.mark.django_db
|
@pytest.mark.django_db
|
||||||
def test_cred_no_org(user, credential):
|
def test_cred_no_org(user, credential):
|
||||||
|
Loading…
Reference in New Issue
Block a user