1
0
mirror of https://github.com/ansible/awx.git synced 2024-11-01 08:21:15 +03:00

Merge branch 'rbac' of github.com:ansible/ansible-tower into rbac

This commit is contained in:
Akita Noek 2016-03-18 11:17:04 -04:00
commit 13dd27ac52
5 changed files with 228 additions and 49 deletions

View File

@ -57,20 +57,6 @@ access_registry = {
}
def user_or_team(data):
try:
if 'user' in data:
pk = get_pk_from_dict(data, 'user')
return get_object_or_400(User, pk=pk), None
elif 'team' in data:
pk = get_pk_from_dict(data, 'team')
return None, get_object_or_400(Team, pk=pk)
else:
return None, None
except ParseError:
return None, None
def register_access(model_class, access_class):
access_classes = access_registry.setdefault(model_class, [])
access_classes.append(access_class)
@ -557,27 +543,27 @@ class CredentialAccess(BaseAccess):
if self.user.is_superuser:
return True
user, team = user_or_team(data)
if user is None and team is None:
return False
if user is not None:
if 'user' in data:
pk = get_pk_from_dict(data, 'user')
user = get_object_or_400(User, pk=pk)
return user.accessible_by(self.user, {'write': True})
if team is not None:
return team.accessible_by(self.user, {'write':True})
elif 'organization' in data:
pk = get_pk_from_dict(data, 'organization')
org = get_object_or_400(Organization, pk=pk)
return org.accessible_by(self.user, {'write': True})
return False
def can_change(self, obj, data):
if self.user.is_superuser:
return True
if not self.can_add(data):
return False
return obj.accessible_by(self.user, {'read':True, 'update': True, 'delete':True})
def can_delete(self, obj):
# Unassociated credentials may be marked deleted by anyone, though we
# shouldn't ever end up with those.
if obj.user is None and obj.team is None:
return True
#if obj.user is None and obj.team is None:
# return True
return self.can_change(obj, None)
class TeamAccess(BaseAccess):

View File

@ -14,8 +14,8 @@ class Migration(migrations.Migration):
operations = [
migrations.RunPython(rbac.migrate_users),
migrations.RunPython(rbac.migrate_organization),
migrations.RunPython(rbac.migrate_credential),
migrations.RunPython(rbac.migrate_team),
migrations.RunPython(rbac.migrate_inventory),
migrations.RunPython(rbac.migrate_projects),
migrations.RunPython(rbac.migrate_credential),
]

View File

@ -1,6 +1,8 @@
from django.contrib.contenttypes.models import ContentType
from django.db.models import Q
from collections import defaultdict
from awx.main.utils import getattrd
import _old_access as old_access
def migrate_users(apps, schema_editor):
@ -52,18 +54,104 @@ def migrate_team(apps, schema_editor):
migrations[t.name].append(user)
return migrations
def attrfunc(attr_path):
'''attrfunc returns a function that will
attempt to use the attr_path to access the attribute
of an instance that is passed in to the returned function.
Example:
get_org = attrfunc('inventory.organization')
org = get_org(JobTemplateInstance)
'''
def attr(inst):
return getattrd(inst, attr_path)
return attr
def _update_credential_parents(org, cred):
org.admin_role.children.add(cred.owner_role)
org.member_role.children.add(cred.usage_role)
cred.user, cred.team = None, None
cred.save()
def _discover_credentials(instances, cred, orgfunc):
'''_discover_credentials will find shared credentials across
organizations. If a shared credential is found, it will duplicate
the credential, ensure the proper role permissions are added to the new
credential, and update any references from the old to the newly created
credential.
instances is a list of all objects that were matched when filtered
with cred.
orgfunc is a function that when called with an instance from instances
will produce an Organization object.
'''
orgs = defaultdict(list)
for inst in instances:
orgs[orgfunc(inst)].append(inst)
if len(orgs) == 1:
_update_credential_parents(instances[0].inventory.organization, cred)
else:
for pos, org in enumerate(orgs):
if pos == 0:
_update_credential_parents(org, cred)
else:
# Create a new credential
cred.pk = None
cred.save()
# Unlink the old information from the new credential
cred.user, cred.team = None, None
cred.owner_role, cred.usage_role = None, None
cred.save()
for i in orgs[org]:
i.credential = cred
i.save()
_update_credential_parents(org, cred)
def migrate_credential(apps, schema_editor):
migrations = defaultdict(list)
credential = apps.get_model('main', "Credential")
for cred in credential.objects.all():
if cred.user:
cred.owner_role.members.add(cred.user)
migrations[cred.name].append(cred.user)
elif cred.team:
cred.owner_role.parents.add(cred.team.admin_role)
cred.usage_role.parents.add(cred.team.member_role)
migrations[cred.name].append(cred.team)
return migrations
Credential = apps.get_model('main', "Credential")
JobTemplate = apps.get_model('main', 'JobTemplate')
Project = apps.get_model('main', 'Project')
InventorySource = apps.get_model('main', 'InventorySource')
migrated = []
for cred in Credential.objects.all():
migrated.append(cred)
results = (JobTemplate.objects.filter(Q(credential=cred) | Q(cloud_credential=cred)).all() or
InventorySource.objects.filter(credential=cred).all())
if results:
if len(results) == 1:
_update_credential_parents(results[0].inventory.organization, cred)
else:
_discover_credentials(results, cred, attrfunc('inventory.organization'))
continue
projs = Project.objects.filter(credential=cred).all()
if projs:
if len(projs) == 1:
_update_credential_parents(projs[0].organization, cred)
else:
_discover_credentials(projs, cred, attrfunc('organization'))
continue
if cred.team is not None:
cred.team.admin_role.children.add(cred.owner_role)
cred.team.member_role.children.add(cred.usage_role)
cred.user, cred.team = None, None
cred.save()
elif cred.user is not None:
cred.user.admin_role.children.add(cred.owner_role)
cred.user, cred.team = None, None
cred.save()
# no match found, log
return migrated
def migrate_inventory(apps, schema_editor):
migrations = defaultdict(dict)

View File

@ -163,8 +163,6 @@ class Credential(PasswordFieldsModel, CommonModelNameNotUnique, ResourceMixin):
role_name='Credential Owner',
role_description='Owner of the credential',
parent_role=[
'team.admin_role',
'user.admin_role',
'singleton:' + ROLE_SINGLETON_SYSTEM_ADMINISTRATOR,
],
permissions = {'all': True}
@ -180,8 +178,7 @@ class Credential(PasswordFieldsModel, CommonModelNameNotUnique, ResourceMixin):
usage_role = ImplicitRoleField(
role_name='Credential User',
role_description='May use this credential, but not read sensitive portions or modify it',
parent_role= 'team.member_role',
permissions = {'read': True, 'use': True}
permissions = {'use': True}
)
@property

View File

@ -2,6 +2,8 @@ import pytest
from awx.main.access import CredentialAccess
from awx.main.models.credential import Credential
from awx.main.models.jobs import JobTemplate
from awx.main.models.inventory import InventorySource
from awx.main.migrations import _rbac as rbac
from django.apps import apps
from django.contrib.auth.models import User
@ -49,9 +51,6 @@ def test_credential_migration_team_admin(credential, team, user, permissions):
credential.team = team
credential.save()
# No permissions pre-migration
team.admin_role.children.remove(credential.owner_role)
team.member_role.children.remove(credential.usage_role)
assert not credential.accessible_by(u, permissions['usage'])
# Usage permissions post migration
@ -76,17 +75,15 @@ def test_credential_access_admin(user, team, credential):
access = CredentialAccess(u)
assert access.can_add({'user': u.pk})
assert access.can_add({'team': team.pk})
assert not access.can_change(credential, {'user': u.pk})
# unowned credential can be deleted
assert access.can_delete(credential)
# unowned credential is superuser only
assert not access.can_delete(credential)
# credential is now part of a team
# that is part of an organization
# that I am an admin for
credential.team = team
credential.owner_role.parents.add(team.admin_role)
credential.save()
credential.owner_role.rebuild_role_ancestor_list()
@ -96,3 +93,114 @@ def test_credential_access_admin(user, team, credential):
# should have can_change access as org-admin
assert access.can_change(credential, {'user': u.pk})
@pytest.mark.django_db
def test_cred_job_template(user, deploy_jobtemplate):
a = user('admin', False)
org = deploy_jobtemplate.project.organization
org.admin_role.members.add(a)
cred = deploy_jobtemplate.credential
cred.user = user('john', False)
cred.save()
access = CredentialAccess(a)
rbac.migrate_credential(apps, None)
assert access.can_change(cred, {'organization': org.pk})
org.admin_role.members.remove(a)
assert not access.can_change(cred, {'organization': org.pk})
@pytest.mark.django_db
def test_cred_multi_job_template_single_org(user, deploy_jobtemplate):
a = user('admin', False)
org = deploy_jobtemplate.project.organization
org.admin_role.members.add(a)
cred = deploy_jobtemplate.credential
cred.user = user('john', False)
cred.save()
access = CredentialAccess(a)
rbac.migrate_credential(apps, None)
assert access.can_change(cred, {'organization': org.pk})
org.admin_role.members.remove(a)
assert not access.can_change(cred, {'organization': org.pk})
@pytest.mark.django_db
def test_single_cred_multi_job_template_multi_org(user, organizations, credential):
orgs = organizations(2)
jts = []
for org in orgs:
inv = org.inventories.create(name="inv-%d" % org.pk)
jt = JobTemplate.objects.create(
inventory=inv,
credential=credential,
name="test-jt-org-%d" % org.pk,
job_type='check',
)
jts.append(jt)
a = user('admin', False)
orgs[0].admin_role.members.add(a)
orgs[1].admin_role.members.add(a)
access = CredentialAccess(a)
rbac.migrate_credential(apps, None)
for jt in jts:
jt.refresh_from_db()
assert jts[0].credential != jts[1].credential
assert access.can_change(jts[0].credential, {'organization': org.pk})
assert access.can_change(jts[1].credential, {'organization': org.pk})
orgs[0].admin_role.members.remove(a)
assert not access.can_change(jts[0].credential, {'organization': org.pk})
@pytest.mark.django_db
def test_cred_inventory_source(user, inventory, credential):
u = user('member', False)
inventory.organization.member_role.members.add(u)
InventorySource.objects.create(
name="test-inv-src",
credential=credential,
inventory=inventory,
)
assert not credential.accessible_by(u, {'use':True})
rbac.migrate_credential(apps, None)
assert credential.accessible_by(u, {'use':True})
@pytest.mark.django_db
def test_cred_project(user, credential, project):
u = user('member', False)
project.organization.member_role.members.add(u)
project.credential = credential
project.save()
assert not credential.accessible_by(u, {'use':True})
rbac.migrate_credential(apps, None)
assert credential.accessible_by(u, {'use':True})
@pytest.mark.django_db
def test_cred_no_org(user, credential):
su = user('su', True)
access = CredentialAccess(su)
assert access.can_change(credential, {'user': su.pk})
@pytest.mark.django_db
def test_cred_team(user, team, credential):
u = user('a', False)
team.member_role.members.add(u)
credential.team = team
credential.save()
assert not credential.accessible_by(u, {'use':True})
rbac.migrate_credential(apps, None)
assert credential.accessible_by(u, {'use':True})