1
0
mirror of https://github.com/ansible/awx.git synced 2024-11-01 16:51:11 +03:00

job template creation to require use_role, include network creds

This commit is contained in:
AlanCoding 2016-06-09 11:13:34 -04:00
parent 74b6ea82fb
commit a431f16953
4 changed files with 95 additions and 13 deletions

View File

@ -757,28 +757,35 @@ class JobTemplateAccess(BaseAccess):
if self.user.is_superuser:
return True
# If a credential is provided, the user should have read access to it.
# If a credential is provided, the user should have use access to it.
credential_pk = get_pk_from_dict(data, 'credential')
if credential_pk:
credential = get_object_or_400(Credential, pk=credential_pk)
if self.user not in credential.read_role:
if self.user not in credential.use_role:
return False
# If a cloud credential is provided, the user should have read access.
# If a cloud credential is provided, the user should have use access.
cloud_credential_pk = get_pk_from_dict(data, 'cloud_credential')
if cloud_credential_pk:
cloud_credential = get_object_or_400(Credential,
pk=cloud_credential_pk)
if self.user not in cloud_credential.read_role:
if self.user not in cloud_credential.use_role:
return False
# Check that the given inventory ID is valid.
# If a network credential is provided, the user should have use access.
network_credential_pk = get_pk_from_dict(data, 'network_credential')
if network_credential_pk:
network_credential = get_object_or_400(Credential,
pk=network_credential_pk)
if self.user not in network_credential.use_role:
return False
# If an inventory is provided, the user should have use access.
inventory_pk = get_pk_from_dict(data, 'inventory')
inventory = Inventory.objects.filter(id=inventory_pk)
if not inventory.exists() and not data.get('ask_inventory_on_launch', False):
return False
if inventory.exists() and self.user not in inventory[0].use_role:
return False
if inventory_pk:
inventory = get_object_or_400(Inventory, pk=inventory_pk)
if self.user not in inventory.use_role:
return False
project_pk = get_pk_from_dict(data, 'project')
if 'job_type' in data and data['job_type'] == PERM_INVENTORY_SCAN:
@ -838,7 +845,7 @@ class JobTemplateAccess(BaseAccess):
def changes_are_non_sensitive(self, obj, data):
'''
Returne true if the changes being made are considered nonsensitive, and
Return true if the changes being made are considered nonsensitive, and
thus can be made by a job template administrator which may not have access
to the any inventory, project, or credentials associated with the template.
'''

View File

@ -179,7 +179,7 @@ def create_job_template(name, roles=None, persisted=True, **kwargs):
"organization",
"inventory",
"project",
"credential",
"credential", "cloud_credential", "network_credential",
"job_type",
"survey",], kwargs)
@ -187,6 +187,8 @@ def create_job_template(name, roles=None, persisted=True, **kwargs):
proj = None
inv = None
cred = None
cloud_cred = None
net_cred = None
spec = None
jobs = {}
job_type = kwargs.get('job_type', 'run')
@ -202,6 +204,16 @@ def create_job_template(name, roles=None, persisted=True, **kwargs):
if type(cred) is not Credential:
cred = mk_credential(cred, persisted=persisted)
if 'cloud_credential' in kwargs:
cloud_cred = kwargs['cloud_credential']
if type(cloud_cred) is not Credential:
cloud_cred = mk_credential(cloud_cred, kind='aws', persisted=persisted)
if 'network_credential' in kwargs:
net_cred = kwargs['network_credential']
if type(net_cred) is not Credential:
net_cred = mk_credential(net_cred, kind='net', persisted=persisted)
if 'project' in kwargs:
proj = kwargs['project']
if type(proj) is not Project:
@ -240,7 +252,7 @@ def create_job_template(name, roles=None, persisted=True, **kwargs):
jobs=jobs,
project=proj,
inventory=inv,
credential=cred,
credential=cred, cloud_credential=cloud_cred, network_credential=net_cred,
job_type=job_type,
organization=org,
survey=spec,)

View File

@ -78,11 +78,15 @@ def test_job_template_factory(job_template_factory):
jt_objects = job_template_factory('testJT', organization='org1',
project='proj1', inventory='inventory1',
credential='cred1', survey='test-survey',
cloud_credential='aws1',
network_credential='juniper1',
jobs=[1])
assert jt_objects.job_template.name == 'testJT'
assert jt_objects.project.name == 'proj1'
assert jt_objects.inventory.name == 'inventory1'
assert jt_objects.credential.name == 'cred1'
assert jt_objects.cloud_credential.name == 'aws1'
assert jt_objects.network_credential.name == 'juniper1'
assert jt_objects.inventory.organization.name == 'org1'
assert jt_objects.job_template.survey_enabled is True
assert jt_objects.job_template.survey_spec is not None

View File

@ -13,6 +13,13 @@ from django.apps import apps
from django.core.urlresolvers import reverse
@pytest.fixture
def jt_objects(job_template_factory):
objects = job_template_factory(
'testJT', organization='org1', project='proj1', inventory='inventory1',
credential='cred1', cloud_credential='aws1', network_credential='juniper1')
return objects
@pytest.mark.django_db
def test_job_template_migration_check(credential, deploy_jobtemplate, check_jobtemplate, user):
admin = user('admin', is_superuser=True)
@ -159,6 +166,58 @@ def test_job_template_access_superuser(check_license, user, deploy_jobtemplate):
assert access.can_read(deploy_jobtemplate)
assert access.can_add({})
@pytest.mark.django_db
def test_job_template_access_read_level(jt_objects, rando):
access = JobTemplateAccess(rando)
jt_objects.project.read_role.members.add(rando)
jt_objects.inventory.read_role.members.add(rando)
jt_objects.credential.read_role.members.add(rando)
jt_objects.cloud_credential.read_role.members.add(rando)
jt_objects.network_credential.read_role.members.add(rando)
proj_pk = jt_objects.project.pk
assert not access.can_add(dict(inventory=jt_objects.inventory.pk, project=proj_pk))
assert not access.can_add(dict(credential=jt_objects.credential.pk, project=proj_pk))
assert not access.can_add(dict(cloud_credential=jt_objects.cloud_credential.pk, project=proj_pk))
assert not access.can_add(dict(network_credential=jt_objects.network_credential.pk, project=proj_pk))
@pytest.mark.django_db
def test_job_template_access_use_level(jt_objects, rando):
access = JobTemplateAccess(rando)
jt_objects.project.use_role.members.add(rando)
jt_objects.inventory.use_role.members.add(rando)
jt_objects.credential.use_role.members.add(rando)
jt_objects.cloud_credential.use_role.members.add(rando)
jt_objects.network_credential.use_role.members.add(rando)
proj_pk = jt_objects.project.pk
assert access.can_add(dict(inventory=jt_objects.inventory.pk, project=proj_pk))
assert access.can_add(dict(credential=jt_objects.credential.pk, project=proj_pk))
assert access.can_add(dict(cloud_credential=jt_objects.cloud_credential.pk, project=proj_pk))
assert access.can_add(dict(network_credential=jt_objects.network_credential.pk, project=proj_pk))
@pytest.mark.django_db
def test_job_template_access_org_admin(jt_objects, rando):
access = JobTemplateAccess(rando)
# Appoint this user as admin of the organization
jt_objects.inventory.organization.admin_role.members.add(rando)
# Assign organization permission in the same way the create view does
organization = jt_objects.inventory.organization
jt_objects.credential.owner_role.parents.add(organization.admin_role)
jt_objects.cloud_credential.owner_role.parents.add(organization.admin_role)
jt_objects.network_credential.owner_role.parents.add(organization.admin_role)
proj_pk = jt_objects.project.pk
assert access.can_add(dict(inventory=jt_objects.inventory.pk, project=proj_pk))
assert access.can_add(dict(credential=jt_objects.credential.pk, project=proj_pk))
assert access.can_add(dict(cloud_credential=jt_objects.cloud_credential.pk, project=proj_pk))
assert access.can_add(dict(network_credential=jt_objects.network_credential.pk, project=proj_pk))
assert access.can_read(jt_objects.job_template)
assert access.can_delete(jt_objects.job_template)
@pytest.mark.django_db
@pytest.mark.job_permissions
def test_job_template_creator_access(project, rando, post):