From a431f16953262e92ec9878865af37d50f53e5197 Mon Sep 17 00:00:00 2001 From: AlanCoding Date: Thu, 9 Jun 2016 11:13:34 -0400 Subject: [PATCH] job template creation to require use_role, include network creds --- awx/main/access.py | 29 +++++---- awx/main/tests/factories/tower.py | 16 ++++- .../functional/test_fixture_factories.py | 4 ++ .../functional/test_rbac_job_templates.py | 59 +++++++++++++++++++ 4 files changed, 95 insertions(+), 13 deletions(-) diff --git a/awx/main/access.py b/awx/main/access.py index 231ece8042..d82a04943a 100644 --- a/awx/main/access.py +++ b/awx/main/access.py @@ -757,28 +757,35 @@ class JobTemplateAccess(BaseAccess): if self.user.is_superuser: return True - # If a credential is provided, the user should have read access to it. + # If a credential is provided, the user should have use access to it. credential_pk = get_pk_from_dict(data, 'credential') if credential_pk: credential = get_object_or_400(Credential, pk=credential_pk) - if self.user not in credential.read_role: + if self.user not in credential.use_role: return False - # If a cloud credential is provided, the user should have read access. + # If a cloud credential is provided, the user should have use access. cloud_credential_pk = get_pk_from_dict(data, 'cloud_credential') if cloud_credential_pk: cloud_credential = get_object_or_400(Credential, pk=cloud_credential_pk) - if self.user not in cloud_credential.read_role: + if self.user not in cloud_credential.use_role: return False - # Check that the given inventory ID is valid. + # If a network credential is provided, the user should have use access. + network_credential_pk = get_pk_from_dict(data, 'network_credential') + if network_credential_pk: + network_credential = get_object_or_400(Credential, + pk=network_credential_pk) + if self.user not in network_credential.use_role: + return False + + # If an inventory is provided, the user should have use access. inventory_pk = get_pk_from_dict(data, 'inventory') - inventory = Inventory.objects.filter(id=inventory_pk) - if not inventory.exists() and not data.get('ask_inventory_on_launch', False): - return False - if inventory.exists() and self.user not in inventory[0].use_role: - return False + if inventory_pk: + inventory = get_object_or_400(Inventory, pk=inventory_pk) + if self.user not in inventory.use_role: + return False project_pk = get_pk_from_dict(data, 'project') if 'job_type' in data and data['job_type'] == PERM_INVENTORY_SCAN: @@ -838,7 +845,7 @@ class JobTemplateAccess(BaseAccess): def changes_are_non_sensitive(self, obj, data): ''' - Returne true if the changes being made are considered nonsensitive, and + Return true if the changes being made are considered nonsensitive, and thus can be made by a job template administrator which may not have access to the any inventory, project, or credentials associated with the template. ''' diff --git a/awx/main/tests/factories/tower.py b/awx/main/tests/factories/tower.py index 5247f4195a..59af0b997d 100644 --- a/awx/main/tests/factories/tower.py +++ b/awx/main/tests/factories/tower.py @@ -179,7 +179,7 @@ def create_job_template(name, roles=None, persisted=True, **kwargs): "organization", "inventory", "project", - "credential", + "credential", "cloud_credential", "network_credential", "job_type", "survey",], kwargs) @@ -187,6 +187,8 @@ def create_job_template(name, roles=None, persisted=True, **kwargs): proj = None inv = None cred = None + cloud_cred = None + net_cred = None spec = None jobs = {} job_type = kwargs.get('job_type', 'run') @@ -202,6 +204,16 @@ def create_job_template(name, roles=None, persisted=True, **kwargs): if type(cred) is not Credential: cred = mk_credential(cred, persisted=persisted) + if 'cloud_credential' in kwargs: + cloud_cred = kwargs['cloud_credential'] + if type(cloud_cred) is not Credential: + cloud_cred = mk_credential(cloud_cred, kind='aws', persisted=persisted) + + if 'network_credential' in kwargs: + net_cred = kwargs['network_credential'] + if type(net_cred) is not Credential: + net_cred = mk_credential(net_cred, kind='net', persisted=persisted) + if 'project' in kwargs: proj = kwargs['project'] if type(proj) is not Project: @@ -240,7 +252,7 @@ def create_job_template(name, roles=None, persisted=True, **kwargs): jobs=jobs, project=proj, inventory=inv, - credential=cred, + credential=cred, cloud_credential=cloud_cred, network_credential=net_cred, job_type=job_type, organization=org, survey=spec,) diff --git a/awx/main/tests/functional/test_fixture_factories.py b/awx/main/tests/functional/test_fixture_factories.py index bb6dd6cd63..1c25f9d38d 100644 --- a/awx/main/tests/functional/test_fixture_factories.py +++ b/awx/main/tests/functional/test_fixture_factories.py @@ -78,11 +78,15 @@ def test_job_template_factory(job_template_factory): jt_objects = job_template_factory('testJT', organization='org1', project='proj1', inventory='inventory1', credential='cred1', survey='test-survey', + cloud_credential='aws1', + network_credential='juniper1', jobs=[1]) assert jt_objects.job_template.name == 'testJT' assert jt_objects.project.name == 'proj1' assert jt_objects.inventory.name == 'inventory1' assert jt_objects.credential.name == 'cred1' + assert jt_objects.cloud_credential.name == 'aws1' + assert jt_objects.network_credential.name == 'juniper1' assert jt_objects.inventory.organization.name == 'org1' assert jt_objects.job_template.survey_enabled is True assert jt_objects.job_template.survey_spec is not None diff --git a/awx/main/tests/functional/test_rbac_job_templates.py b/awx/main/tests/functional/test_rbac_job_templates.py index 3c7a7b32fc..28d4571378 100644 --- a/awx/main/tests/functional/test_rbac_job_templates.py +++ b/awx/main/tests/functional/test_rbac_job_templates.py @@ -13,6 +13,13 @@ from django.apps import apps from django.core.urlresolvers import reverse +@pytest.fixture +def jt_objects(job_template_factory): + objects = job_template_factory( + 'testJT', organization='org1', project='proj1', inventory='inventory1', + credential='cred1', cloud_credential='aws1', network_credential='juniper1') + return objects + @pytest.mark.django_db def test_job_template_migration_check(credential, deploy_jobtemplate, check_jobtemplate, user): admin = user('admin', is_superuser=True) @@ -159,6 +166,58 @@ def test_job_template_access_superuser(check_license, user, deploy_jobtemplate): assert access.can_read(deploy_jobtemplate) assert access.can_add({}) +@pytest.mark.django_db +def test_job_template_access_read_level(jt_objects, rando): + + access = JobTemplateAccess(rando) + jt_objects.project.read_role.members.add(rando) + jt_objects.inventory.read_role.members.add(rando) + jt_objects.credential.read_role.members.add(rando) + jt_objects.cloud_credential.read_role.members.add(rando) + jt_objects.network_credential.read_role.members.add(rando) + + proj_pk = jt_objects.project.pk + assert not access.can_add(dict(inventory=jt_objects.inventory.pk, project=proj_pk)) + assert not access.can_add(dict(credential=jt_objects.credential.pk, project=proj_pk)) + assert not access.can_add(dict(cloud_credential=jt_objects.cloud_credential.pk, project=proj_pk)) + assert not access.can_add(dict(network_credential=jt_objects.network_credential.pk, project=proj_pk)) + +@pytest.mark.django_db +def test_job_template_access_use_level(jt_objects, rando): + + access = JobTemplateAccess(rando) + jt_objects.project.use_role.members.add(rando) + jt_objects.inventory.use_role.members.add(rando) + jt_objects.credential.use_role.members.add(rando) + jt_objects.cloud_credential.use_role.members.add(rando) + jt_objects.network_credential.use_role.members.add(rando) + + proj_pk = jt_objects.project.pk + assert access.can_add(dict(inventory=jt_objects.inventory.pk, project=proj_pk)) + assert access.can_add(dict(credential=jt_objects.credential.pk, project=proj_pk)) + assert access.can_add(dict(cloud_credential=jt_objects.cloud_credential.pk, project=proj_pk)) + assert access.can_add(dict(network_credential=jt_objects.network_credential.pk, project=proj_pk)) + +@pytest.mark.django_db +def test_job_template_access_org_admin(jt_objects, rando): + access = JobTemplateAccess(rando) + # Appoint this user as admin of the organization + jt_objects.inventory.organization.admin_role.members.add(rando) + # Assign organization permission in the same way the create view does + organization = jt_objects.inventory.organization + jt_objects.credential.owner_role.parents.add(organization.admin_role) + jt_objects.cloud_credential.owner_role.parents.add(organization.admin_role) + jt_objects.network_credential.owner_role.parents.add(organization.admin_role) + + proj_pk = jt_objects.project.pk + assert access.can_add(dict(inventory=jt_objects.inventory.pk, project=proj_pk)) + assert access.can_add(dict(credential=jt_objects.credential.pk, project=proj_pk)) + assert access.can_add(dict(cloud_credential=jt_objects.cloud_credential.pk, project=proj_pk)) + assert access.can_add(dict(network_credential=jt_objects.network_credential.pk, project=proj_pk)) + + assert access.can_read(jt_objects.job_template) + assert access.can_delete(jt_objects.job_template) + @pytest.mark.django_db @pytest.mark.job_permissions def test_job_template_creator_access(project, rando, post):