mirror of
https://github.com/ansible/awx.git
synced 2024-10-26 16:25:06 +03:00
Various RBAC fixes related to managed RoleDefinitions (#15287)
* Add migration testing for certain managed roles * Fix managed role bugs * Add more tests * Fix another bug with org workflow admin role reference * Add test because another issue is fixed * Mark reason for test * Remove internal markers * Reword failure message Co-authored-by: Seth Foster <fosterseth@users.noreply.github.com> --------- Co-authored-by: Seth Foster <fosterseth@users.noreply.github.com>
This commit is contained in:
parent
4738c8333a
commit
853af295d9
@ -292,12 +292,13 @@ def setup_managed_role_definitions(apps, schema_editor):
|
||||
org_perms = set()
|
||||
for cls in permission_registry.all_registered_models:
|
||||
ct = ContentType.objects.get_for_model(cls)
|
||||
cls_name = cls._meta.model_name
|
||||
object_perms = set(Permission.objects.filter(content_type=ct))
|
||||
# Special case for InstanceGroup which has an organiation field, but is not an organization child object
|
||||
if cls._meta.model_name != 'instancegroup':
|
||||
if cls_name != 'instancegroup':
|
||||
org_perms.update(object_perms)
|
||||
|
||||
if 'object_admin' in to_create and cls != Organization:
|
||||
if 'object_admin' in to_create and cls_name != 'organization':
|
||||
indiv_perms = object_perms.copy()
|
||||
add_perms = [perm for perm in indiv_perms if perm.codename.startswith('add_')]
|
||||
if add_perms:
|
||||
@ -310,7 +311,7 @@ def setup_managed_role_definitions(apps, schema_editor):
|
||||
)
|
||||
)
|
||||
|
||||
if 'org_children' in to_create and cls != Organization:
|
||||
if 'org_children' in to_create and (cls_name not in ('organization', 'instancegroup', 'team')):
|
||||
org_child_perms = object_perms.copy()
|
||||
org_child_perms.add(Permission.objects.get(codename='view_organization'))
|
||||
|
||||
@ -327,7 +328,7 @@ def setup_managed_role_definitions(apps, schema_editor):
|
||||
if 'special' in to_create:
|
||||
special_perms = []
|
||||
for perm in object_perms:
|
||||
if perm.codename.split('_')[0] not in ('add', 'change', 'update', 'delete', 'view'):
|
||||
if perm.codename.split('_')[0] not in ('add', 'change', 'delete', 'view'):
|
||||
special_perms.append(perm)
|
||||
for perm in special_perms:
|
||||
action = perm.codename.split('_')[0]
|
||||
|
@ -591,8 +591,13 @@ def get_role_from_object_role(object_role):
|
||||
role_name = role_name.lower()
|
||||
model_cls = apps.get_model('main', target_model_name)
|
||||
target_model_name = get_type_for_model(model_cls)
|
||||
|
||||
# exception cases completely specific to one model naming convention
|
||||
if target_model_name == 'notification_template':
|
||||
target_model_name = 'notification' # total exception
|
||||
target_model_name = 'notification'
|
||||
elif target_model_name == 'workflow_job_template':
|
||||
target_model_name = 'workflow'
|
||||
|
||||
role_name = f'{target_model_name}_admin_role'
|
||||
elif rd.name.endswith(' Admin'):
|
||||
# cases like "project-admin"
|
||||
|
@ -109,3 +109,17 @@ def test_team_indirect_access(get, team, admin_user, inventory):
|
||||
assert len(by_username['u1']['summary_fields']['indirect_access']) == 0
|
||||
access_entry = by_username['u1']['summary_fields']['direct_access'][0]
|
||||
assert sorted(access_entry['descendant_roles']) == sorted(['adhoc_role', 'use_role', 'update_role', 'read_role', 'admin_role'])
|
||||
|
||||
|
||||
@pytest.mark.django_db
|
||||
def test_workflow_access_list(workflow_job_template, alice, bob, setup_managed_roles, get, admin_user):
|
||||
"""Basic verification that WFJT access_list is functional"""
|
||||
workflow_job_template.admin_role.members.add(alice)
|
||||
workflow_job_template.organization.workflow_admin_role.members.add(bob)
|
||||
|
||||
url = reverse('api:workflow_job_template_access_list', kwargs={'pk': workflow_job_template.pk})
|
||||
for u in (alice, bob, admin_user):
|
||||
response = get(url, user=u, expect=200)
|
||||
user_ids = [item['id'] for item in response.data['results']]
|
||||
assert alice.pk in user_ids
|
||||
assert bob.pk in user_ids
|
||||
|
@ -5,6 +5,7 @@ from django.urls import reverse as django_reverse
|
||||
|
||||
from awx.api.versioning import reverse
|
||||
from awx.main.models import JobTemplate, Inventory, Organization
|
||||
from awx.main.access import JobTemplateAccess, WorkflowJobTemplateAccess
|
||||
|
||||
from ansible_base.rbac.models import RoleDefinition
|
||||
|
||||
@ -88,3 +89,32 @@ def test_assign_custom_add_role(admin_user, rando, organization, post, setup_man
|
||||
inv_id = r.data['id']
|
||||
inventory = Inventory.objects.get(id=inv_id)
|
||||
assert rando.has_obj_perm(inventory, 'change')
|
||||
|
||||
|
||||
@pytest.mark.django_db
|
||||
def test_jt_creation_permissions(setup_managed_roles, inventory, project, rando):
|
||||
"""This tests that if you assign someone required permissions in the new API
|
||||
using the managed roles, then that works to give permissions to create a job template"""
|
||||
inv_rd = RoleDefinition.objects.get(name='Inventory Admin')
|
||||
proj_rd = RoleDefinition.objects.get(name='Project Admin')
|
||||
# establish prior state
|
||||
access = JobTemplateAccess(rando)
|
||||
assert not access.can_add({'inventory': inventory.pk, 'project': project.pk, 'name': 'foo-jt'})
|
||||
|
||||
inv_rd.give_permission(rando, inventory)
|
||||
proj_rd.give_permission(rando, project)
|
||||
|
||||
assert access.can_add({'inventory': inventory.pk, 'project': project.pk, 'name': 'foo-jt'})
|
||||
|
||||
|
||||
@pytest.mark.django_db
|
||||
def test_workflow_creation_permissions(setup_managed_roles, organization, workflow_job_template, rando):
|
||||
"""Similar to JT, assigning new roles gives creator permissions"""
|
||||
org_wf_rd = RoleDefinition.objects.get(name='Organization WorkflowJobTemplate Admin')
|
||||
assert workflow_job_template.organization == organization # sanity
|
||||
# establish prior state
|
||||
access = WorkflowJobTemplateAccess(rando)
|
||||
assert not access.can_add({'name': 'foo-flow', 'organization': organization.pk})
|
||||
org_wf_rd.give_permission(rando, organization)
|
||||
|
||||
assert access.can_add({'name': 'foo-flow', 'organization': organization.pk})
|
||||
|
31
awx/main/tests/functional/dab_rbac/test_managed_roles.py
Normal file
31
awx/main/tests/functional/dab_rbac/test_managed_roles.py
Normal file
@ -0,0 +1,31 @@
|
||||
import pytest
|
||||
|
||||
from ansible_base.rbac.models import RoleDefinition, DABPermission
|
||||
|
||||
|
||||
@pytest.mark.django_db
|
||||
def test_roles_to_not_create(setup_managed_roles):
|
||||
assert RoleDefinition.objects.filter(name='Organization Admin').count() == 1
|
||||
|
||||
SHOULD_NOT_EXIST = ('Organization Organization Admin', 'Organization Team Admin', 'Organization InstanceGroup Admin')
|
||||
|
||||
bad_rds = RoleDefinition.objects.filter(name__in=SHOULD_NOT_EXIST)
|
||||
if bad_rds.exists():
|
||||
bad_names = list(bad_rds.values_list('name', flat=True))
|
||||
raise Exception(f'Found RoleDefinitions that should not exist: {bad_names}')
|
||||
|
||||
|
||||
@pytest.mark.django_db
|
||||
def test_project_update_role(setup_managed_roles):
|
||||
"""Role to allow updating a project on the object-level should exist"""
|
||||
assert RoleDefinition.objects.filter(name='Project Update').count() == 1
|
||||
|
||||
|
||||
@pytest.mark.django_db
|
||||
def test_org_child_add_permission(setup_managed_roles):
|
||||
for model_name in ('Project', 'NotificationTemplate', 'WorkflowJobTemplate', 'Inventory'):
|
||||
rd = RoleDefinition.objects.get(name=f'Organization {model_name} Admin')
|
||||
assert 'add_' in str(rd.permissions.values_list('codename', flat=True)), f'The {rd.name} role definition expected to contain add_ permissions'
|
||||
|
||||
# special case for JobTemplate, anyone can create one with use permission to project/inventory
|
||||
assert not DABPermission.objects.filter(codename='add_jobtemplate').exists()
|
@ -16,7 +16,16 @@ from ansible_base.rbac.models import RoleUserAssignment, RoleDefinition
|
||||
@pytest.mark.django_db
|
||||
@pytest.mark.parametrize(
|
||||
'role_name',
|
||||
['execution_environment_admin_role', 'project_admin_role', 'admin_role', 'auditor_role', 'read_role', 'execute_role', 'notification_admin_role'],
|
||||
[
|
||||
'execution_environment_admin_role',
|
||||
'workflow_admin_role',
|
||||
'project_admin_role',
|
||||
'admin_role',
|
||||
'auditor_role',
|
||||
'read_role',
|
||||
'execute_role',
|
||||
'notification_admin_role',
|
||||
],
|
||||
)
|
||||
def test_round_trip_roles(organization, rando, role_name, setup_managed_roles):
|
||||
"""
|
||||
|
@ -85,3 +85,10 @@ class TestMigrationSmoke:
|
||||
|
||||
RoleUserAssignment = new_state.apps.get_model('dab_rbac', 'RoleUserAssignment')
|
||||
assert RoleUserAssignment.objects.filter(user=user.id, object_id=org.id).exists()
|
||||
|
||||
# Regression testing for bug that comes from current vs past models mismatch
|
||||
RoleDefinition = new_state.apps.get_model('dab_rbac', 'RoleDefinition')
|
||||
assert not RoleDefinition.objects.filter(name='Organization Organization Admin').exists()
|
||||
# Test special cases in managed role creation
|
||||
assert not RoleDefinition.objects.filter(name='Organization Team Admin').exists()
|
||||
assert not RoleDefinition.objects.filter(name='Organization InstanceGroup Admin').exists()
|
||||
|
Loading…
Reference in New Issue
Block a user