From 3f49d2c455168b405dcd377a23c13cc91bd72dab Mon Sep 17 00:00:00 2001 From: Alan Rominger Date: Wed, 23 Oct 2019 10:59:35 -0400 Subject: [PATCH] RBAC relaunch 403 updates (#3835) * RBAC relaunch 403 updates Addresses 2 things 1. If WFJ relaunch is attempted, and relaunch is denied because the WFJ had encrypted survey answers, a generic message was shown, this changes it to show a specific error message 2. Org admins are banned from relaunching a job if the job has encrypted survey answers * update tests to raises access pattern * catch PermissionDenied for user_capabilities --- awx/main/access.py | 50 ++++++++----------- awx/main/tests/functional/test_rbac_job.py | 5 +- .../tests/functional/test_rbac_workflow.py | 5 +- 3 files changed, 28 insertions(+), 32 deletions(-) diff --git a/awx/main/access.py b/awx/main/access.py index 3eefb08723..7f9be65333 100644 --- a/awx/main/access.py +++ b/awx/main/access.py @@ -465,7 +465,7 @@ class BaseAccess(object): else: relationship = 'members' return access_method(obj, parent_obj, relationship, skip_sub_obj_read_check=True, data={}) - except (ParseError, ObjectDoesNotExist): + except (ParseError, ObjectDoesNotExist, PermissionDenied): return False return False @@ -1660,26 +1660,19 @@ class JobAccess(BaseAccess): except JobLaunchConfig.DoesNotExist: config = None + if obj.job_template and (self.user not in obj.job_template.execute_role): + return False + # Check if JT execute access (and related prompts) is sufficient - if obj.job_template is not None: - if config is None: - prompts_access = False - elif not config.has_user_prompts(obj.job_template): - prompts_access = True - elif obj.created_by_id != self.user.pk and vars_are_encrypted(config.extra_data): - prompts_access = False - if self.save_messages: - self.messages['detail'] = _('Job was launched with secret prompts provided by another user.') - else: - prompts_access = ( - JobLaunchConfigAccess(self.user).can_add({'reference_obj': config}) and - not config.has_unprompted(obj.job_template) - ) - jt_access = self.user in obj.job_template.execute_role - if prompts_access and jt_access: + if config and obj.job_template: + if not config.has_user_prompts(obj.job_template): return True - elif not jt_access: - return False + elif obj.created_by_id != self.user.pk and vars_are_encrypted(config.extra_data): + # never allowed, not even for org admins + raise PermissionDenied(_('Job was launched with secret prompts provided by another user.')) + elif not config.has_unprompted(obj.job_template): + if JobLaunchConfigAccess(self.user).can_add({'reference_obj': config}): + return True org_access = bool(obj.inventory) and self.user in obj.inventory.organization.inventory_admin_role project_access = obj.project is None or self.user in obj.project.admin_role @@ -2098,23 +2091,20 @@ class WorkflowJobAccess(BaseAccess): self.messages['detail'] = _('Workflow Job was launched with unknown prompts.') return False + # execute permission to WFJT is mandatory for any relaunch + if self.user not in template.execute_role: + return False + # Check if access to prompts to prevent relaunch if config.prompts_dict(): if obj.created_by_id != self.user.pk and vars_are_encrypted(config.extra_data): - if self.save_messages: - self.messages['detail'] = _('Job was launched with secret prompts provided by another user.') - return False + raise PermissionDenied(_("Job was launched with secret prompts provided by another user.")) if not JobLaunchConfigAccess(self.user).can_add({'reference_obj': config}): - if self.save_messages: - self.messages['detail'] = _('Job was launched with prompts you lack access to.') - return False + raise PermissionDenied(_('Job was launched with prompts you lack access to.')) if config.has_unprompted(template): - if self.save_messages: - self.messages['detail'] = _('Job was launched with prompts no longer accepted.') - return False + raise PermissionDenied(_('Job was launched with prompts no longer accepted.')) - # execute permission to WFJT is mandatory for any relaunch - return (self.user in template.execute_role) + return True # passed config checks def can_recreate(self, obj): node_qs = obj.workflow_job_nodes.all().prefetch_related('inventory', 'credentials', 'unified_job_template') diff --git a/awx/main/tests/functional/test_rbac_job.py b/awx/main/tests/functional/test_rbac_job.py index 13866565fa..5dbe797f6c 100644 --- a/awx/main/tests/functional/test_rbac_job.py +++ b/awx/main/tests/functional/test_rbac_job.py @@ -19,6 +19,8 @@ from awx.main.models import ( Credential ) +from rest_framework.exceptions import PermissionDenied + from crum import impersonate @@ -252,7 +254,8 @@ class TestJobRelaunchAccess: assert 'job_var' in job.launch_config.extra_data assert bob.can_access(Job, 'start', job, validate_license=False) - assert not alice.can_access(Job, 'start', job, validate_license=False) + with pytest.raises(PermissionDenied): + alice.can_access(Job, 'start', job, validate_license=False) @pytest.mark.django_db diff --git a/awx/main/tests/functional/test_rbac_workflow.py b/awx/main/tests/functional/test_rbac_workflow.py index 04926385c0..a53d769324 100644 --- a/awx/main/tests/functional/test_rbac_workflow.py +++ b/awx/main/tests/functional/test_rbac_workflow.py @@ -7,6 +7,8 @@ from awx.main.access import ( # WorkflowJobNodeAccess ) +from rest_framework.exceptions import PermissionDenied + from awx.main.models import InventorySource, JobLaunchConfig @@ -169,7 +171,8 @@ class TestWorkflowJobAccess: wfjt.ask_inventory_on_launch = True wfjt.save() JobLaunchConfig.objects.create(job=workflow_job, inventory=inventory) - assert not WorkflowJobAccess(rando).can_start(workflow_job) + with pytest.raises(PermissionDenied): + WorkflowJobAccess(rando).can_start(workflow_job) inventory.use_role.members.add(rando) assert WorkflowJobAccess(rando).can_start(workflow_job)