1
0
mirror of https://github.com/ansible/awx.git synced 2024-11-02 09:51:09 +03:00

refactored roles handling and added some more tests

This commit is contained in:
Wayne Witzel III 2016-05-24 15:29:28 -04:00
parent 4453e48729
commit aaabc2582d
2 changed files with 89 additions and 44 deletions

View File

@ -107,6 +107,44 @@ def mk_job_template(name, **kwargs):
jt.save() jt.save()
return jt return jt
def apply_roles(roles, objects, persisted):
if roles is None:
return None
if not persisted:
raise RuntimeError('roles can not be used when persisted=False')
all_objects = {}
for d in objects:
for k,v in d.iteritems():
if all_objects.get(k) is not None:
raise KeyError('object names must be unique when using roles {} key already exists with value {}'.format(k,v))
all_objects[k] = v
for role in roles:
obj_role, sep, member_role = role.partition(':')
if not member_role:
raise RuntimeError('you must provide an assignment role, got None')
obj_str, o_role_str = obj_role.split('.')
member_str, m_sep, m_role_str = member_role.partition('.')
obj = all_objects[obj_str]
obj_role = getattr(obj, o_role_str)
member = all_objects[member_str]
if m_role_str:
if hasattr(member, m_role_str):
member_role = getattr(member, m_role_str)
obj_role.parents.add(member_role)
else:
raise RuntimeError('unable to find {} role for {}'.format(m_role_str, member_str))
else:
if type(member) is User:
obj_role.members.add(member)
else:
raise RuntimeError('unable to add non-user {} for members list of {}'.format(member_str, obj_str))
class _Mapped(object): class _Mapped(object):
def __init__(self, d): def __init__(self, d):
@ -151,6 +189,9 @@ def create_job_template(name, **kwargs):
inventory=inv, credential=cred, inventory=inv, credential=cred,
job_type=job_type, persisted=persisted) job_type=job_type, persisted=persisted)
objects = [{o.name: o} for o in [org, proj, inv, cred]]
apply_roles(kwargs.get('roles'), objects, persisted)
return Objects(job_template=jt, return Objects(job_template=jt,
project=proj, project=proj,
inventory=inv, inventory=inv,
@ -158,18 +199,17 @@ def create_job_template(name, **kwargs):
job_type=job_type) job_type=job_type)
def create_organization(name, **kwargs): def create_organization(name, **kwargs):
Objects = namedtuple("Objects", "organization,teams,users,superusers,projects,labels,roles") Objects = namedtuple("Objects", "organization,teams,users,superusers,projects,labels")
org = mk_organization(name, '%s-desc'.format(name))
superusers = {} superusers = {}
users = {} users = {}
teams = {} teams = {}
projects = {} projects = {}
labels = {} labels = {}
roles = {}
persisted = kwargs.get('persisted', True) persisted = kwargs.get('persisted', True)
org = mk_organization(name, '%s-desc'.format(name), persisted=persisted)
if 'teams' in kwargs: if 'teams' in kwargs:
for t in kwargs['teams']: for t in kwargs['teams']:
if type(t) is Team: if type(t) is Team:
@ -217,47 +257,11 @@ def create_organization(name, **kwargs):
else: else:
labels[l] = mk_label(l, org, persisted=persisted) labels[l] = mk_label(l, org, persisted=persisted)
if 'roles' in kwargs: apply_roles(kwargs.get('roles'), [superusers, users, teams, projects, labels], persisted)
# refactor this .. alot
if not persisted:
raise RuntimeError('roles can not be used when persisted=False')
all_objects = {}
for d in [superusers, users, teams, projects, labels]:
for k,v in d.iteritems():
if all_objects.get(k) is not None:
raise KeyError('object names must be unique when using roles \
{} key already exists with value {}'.format(k,v))
all_objects[k] = v
for role in kwargs.get('roles'):
obj_role, sep, member_role = role.partition(':')
if not member_role:
raise RuntimeError('you must an assignment role, got None')
obj_str, o_role_str = obj_role.split('.')
member_str, m_sep, m_role_str = member_role.partition('.')
obj = all_objects[obj_str]
obj_role = getattr(obj, o_role_str)
member = all_objects[member_str]
if m_role_str:
if hasattr(member, m_role_str):
member_role = getattr(member, m_role_str)
obj_role.parents.add(member_role)
else:
raise RuntimeError('unable to find {} role for {}'.format(m_role_str, member_str))
else:
if type(member) is User:
obj_role.members.add(member)
else:
raise RuntimeError('unable to add non-user {} for members list of {}'.format(member_str, obj_str))
return Objects(organization=org, return Objects(organization=org,
superusers=_Mapped(superusers), superusers=_Mapped(superusers),
users=_Mapped(users), users=_Mapped(users),
teams=_Mapped(teams), teams=_Mapped(teams),
projects=_Mapped(projects), projects=_Mapped(projects),
labels=_Mapped(labels), labels=_Mapped(labels))
roles=_Mapped(roles))

View File

@ -1,5 +1,46 @@
import pytest import pytest
def test_roles_exc_not_persisted(organization_factory):
with pytest.raises(RuntimeError) as exc:
organization_factory('test-org', roles=['test-org.admin_role:user1'], persisted=False)
assert 'persisted=False' in str(exc.value)
@pytest.mark.django_db
def test_roles_exc_bad_object(organization_factory):
with pytest.raises(KeyError):
organization_factory('test-org', roles=['test-project.admin_role:user'])
@pytest.mark.django_db
def test_roles_exc_not_unique(organization_factory):
with pytest.raises(KeyError) as exc:
organization_factory('test-org', projects=['foo'], teams=['foo'], roles=['foo.admin_role:user'])
assert 'must be unique' in str(exc.value)
@pytest.mark.django_db
def test_roles_exc_not_assignment(organization_factory):
with pytest.raises(RuntimeError) as exc:
organization_factory('test-org', projects=['foo'], roles=['foo.admin_role'])
assert 'provide an assignment' in str(exc.value)
@pytest.mark.django_db
def test_roles_exc_not_found(organization_factory):
with pytest.raises(RuntimeError) as exc:
organization_factory('test-org', users=['user'], projects=['foo'], roles=['foo.admin_role:user.bad_role'])
assert 'unable to find' in str(exc.value)
@pytest.mark.django_db
def test_roles_exc_not_user(organization_factory):
with pytest.raises(RuntimeError) as exc:
organization_factory('test-org', projects=['foo'], roles=['foo.admin_role:foo'])
assert 'unable to add non-user' in str(exc.value)
@pytest.mark.django_db @pytest.mark.django_db
def test_org_factory_roles(organization_factory): def test_org_factory_roles(organization_factory):
objects = organization_factory('org_roles_test', objects = organization_factory('org_roles_test',
@ -14,7 +55,7 @@ def test_org_factory_roles(organization_factory):
assert objects.users.bar in objects.teams.team1.admin_role assert objects.users.bar in objects.teams.team1.admin_role
assert objects.users.foo in objects.projects.baz.admin_role assert objects.users.foo in objects.projects.baz.admin_role
assert objects.users.foo in objects.teams.team1.member_role assert objects.users.foo in objects.teams.team1.member_role
assert objects.teams.team2.admin_role in objects.teams.team1n.admin_role.parents.all() assert objects.teams.team2.admin_role in objects.teams.team1.admin_role.parents.all()
@pytest.mark.django_db @pytest.mark.django_db