1
0
mirror of https://github.com/ansible/awx.git synced 2024-11-01 08:21:15 +03:00

RBAC relaunch 403 updates (#3835)

* RBAC relaunch 403 updates

Addresses 2 things

1. If WFJ relaunch is attempted, and relaunch is denied
  because the WFJ had encrypted survey answers,
  a generic message was shown, this changes it to show
  a specific error message

2. Org admins are banned from relaunching a job
  if the job has encrypted survey answers

* update tests to raises access pattern

* catch PermissionDenied for user_capabilities
This commit is contained in:
Alan Rominger 2019-10-23 10:59:35 -04:00 committed by Ryan Petrello
parent a0fb9bef3a
commit 3f49d2c455
No known key found for this signature in database
GPG Key ID: F2AA5F2122351777
3 changed files with 28 additions and 32 deletions

View File

@ -465,7 +465,7 @@ class BaseAccess(object):
else:
relationship = 'members'
return access_method(obj, parent_obj, relationship, skip_sub_obj_read_check=True, data={})
except (ParseError, ObjectDoesNotExist):
except (ParseError, ObjectDoesNotExist, PermissionDenied):
return False
return False
@ -1660,26 +1660,19 @@ class JobAccess(BaseAccess):
except JobLaunchConfig.DoesNotExist:
config = None
if obj.job_template and (self.user not in obj.job_template.execute_role):
return False
# Check if JT execute access (and related prompts) is sufficient
if obj.job_template is not None:
if config is None:
prompts_access = False
elif not config.has_user_prompts(obj.job_template):
prompts_access = True
elif obj.created_by_id != self.user.pk and vars_are_encrypted(config.extra_data):
prompts_access = False
if self.save_messages:
self.messages['detail'] = _('Job was launched with secret prompts provided by another user.')
else:
prompts_access = (
JobLaunchConfigAccess(self.user).can_add({'reference_obj': config}) and
not config.has_unprompted(obj.job_template)
)
jt_access = self.user in obj.job_template.execute_role
if prompts_access and jt_access:
if config and obj.job_template:
if not config.has_user_prompts(obj.job_template):
return True
elif not jt_access:
return False
elif obj.created_by_id != self.user.pk and vars_are_encrypted(config.extra_data):
# never allowed, not even for org admins
raise PermissionDenied(_('Job was launched with secret prompts provided by another user.'))
elif not config.has_unprompted(obj.job_template):
if JobLaunchConfigAccess(self.user).can_add({'reference_obj': config}):
return True
org_access = bool(obj.inventory) and self.user in obj.inventory.organization.inventory_admin_role
project_access = obj.project is None or self.user in obj.project.admin_role
@ -2098,23 +2091,20 @@ class WorkflowJobAccess(BaseAccess):
self.messages['detail'] = _('Workflow Job was launched with unknown prompts.')
return False
# execute permission to WFJT is mandatory for any relaunch
if self.user not in template.execute_role:
return False
# Check if access to prompts to prevent relaunch
if config.prompts_dict():
if obj.created_by_id != self.user.pk and vars_are_encrypted(config.extra_data):
if self.save_messages:
self.messages['detail'] = _('Job was launched with secret prompts provided by another user.')
return False
raise PermissionDenied(_("Job was launched with secret prompts provided by another user."))
if not JobLaunchConfigAccess(self.user).can_add({'reference_obj': config}):
if self.save_messages:
self.messages['detail'] = _('Job was launched with prompts you lack access to.')
return False
raise PermissionDenied(_('Job was launched with prompts you lack access to.'))
if config.has_unprompted(template):
if self.save_messages:
self.messages['detail'] = _('Job was launched with prompts no longer accepted.')
return False
raise PermissionDenied(_('Job was launched with prompts no longer accepted.'))
# execute permission to WFJT is mandatory for any relaunch
return (self.user in template.execute_role)
return True # passed config checks
def can_recreate(self, obj):
node_qs = obj.workflow_job_nodes.all().prefetch_related('inventory', 'credentials', 'unified_job_template')

View File

@ -19,6 +19,8 @@ from awx.main.models import (
Credential
)
from rest_framework.exceptions import PermissionDenied
from crum import impersonate
@ -252,7 +254,8 @@ class TestJobRelaunchAccess:
assert 'job_var' in job.launch_config.extra_data
assert bob.can_access(Job, 'start', job, validate_license=False)
assert not alice.can_access(Job, 'start', job, validate_license=False)
with pytest.raises(PermissionDenied):
alice.can_access(Job, 'start', job, validate_license=False)
@pytest.mark.django_db

View File

@ -7,6 +7,8 @@ from awx.main.access import (
# WorkflowJobNodeAccess
)
from rest_framework.exceptions import PermissionDenied
from awx.main.models import InventorySource, JobLaunchConfig
@ -169,7 +171,8 @@ class TestWorkflowJobAccess:
wfjt.ask_inventory_on_launch = True
wfjt.save()
JobLaunchConfig.objects.create(job=workflow_job, inventory=inventory)
assert not WorkflowJobAccess(rando).can_start(workflow_job)
with pytest.raises(PermissionDenied):
WorkflowJobAccess(rando).can_start(workflow_job)
inventory.use_role.members.add(rando)
assert WorkflowJobAccess(rando).can_start(workflow_job)