1
0
mirror of https://github.com/ansible/awx.git synced 2024-11-02 09:51:09 +03:00

Merge pull request #1286 from AlanCoding/remove_user_roles

Remove the "user admin role" entirely
This commit is contained in:
Alan Rominger 2018-04-16 07:33:55 -04:00 committed by GitHub
commit 900ea14883
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
10 changed files with 44 additions and 203 deletions

View File

@ -1683,14 +1683,8 @@ class UserRolesList(SubListAttachDetachAPIView):
if not sub_id:
return super(UserRolesList, self).post(request)
if sub_id == self.request.user.admin_role.pk:
raise PermissionDenied(_('You may not perform any action with your own admin_role.'))
user = get_object_or_400(User, pk=self.kwargs['pk'])
role = get_object_or_400(Role, pk=sub_id)
user_content_type = ContentType.objects.get_for_model(User)
if role.content_type == user_content_type:
raise PermissionDenied(_('You may not change the membership of a users admin_role'))
credential_content_type = ContentType.objects.get_for_model(Credential)
if role.content_type == credential_content_type:
@ -4909,12 +4903,6 @@ class RoleUsersList(SubListAttachDetachAPIView):
user = get_object_or_400(User, pk=sub_id)
role = self.get_parent_object()
if role == self.request.user.admin_role:
raise PermissionDenied(_('You may not perform any action with your own admin_role.'))
user_content_type = ContentType.objects.get_for_model(User)
if role.content_type == user_content_type:
raise PermissionDenied(_('You may not change the membership of a users admin_role'))
credential_content_type = ContentType.objects.get_for_model(Credential)
if role.content_type == credential_content_type:

View File

@ -33,8 +33,7 @@ from awx.main.models.mixins import ResourceMixin
from awx.conf.license import LicenseForbids, feature_enabled
__all__ = ['get_user_queryset', 'check_user_access', 'check_user_access_with_errors',
'user_accessible_objects', 'consumer_access',
'user_admin_role',]
'user_accessible_objects', 'consumer_access',]
logger = logging.getLogger('awx.main.access')
@ -78,18 +77,6 @@ def register_access(model_class, access_class):
access_registry[model_class] = access_class
@property
def user_admin_role(self):
role = Role.objects.get(
content_type=ContentType.objects.get_for_model(User),
object_id=self.id,
role_field='admin_role'
)
# Trick the user.admin_role so that the signal filtering for RBAC activity stream works as intended.
role.parents = [org.admin_role.pk for org in self.organizations]
return role
def user_accessible_objects(user, role_name):
return ResourceMixin._accessible_objects(User, user, role_name)

View File

@ -0,0 +1,22 @@
# -*- coding: utf-8 -*-
# Generated by Django 1.11.11 on 2018-04-02 19:18
from __future__ import unicode_literals
from django.db import migrations
from awx.main.migrations import ActivityStreamDisabledMigration
from awx.main.migrations._rbac import delete_all_user_roles, rebuild_role_hierarchy
from awx.main.migrations import _migration_utils as migration_utils
class Migration(ActivityStreamDisabledMigration):
dependencies = [
('main', '0031_v330_oauth_help_text'),
]
operations = [
migrations.RunPython(migration_utils.set_current_apps_for_migrations),
migrations.RunPython(delete_all_user_roles),
migrations.RunPython(rebuild_role_hierarchy),
]

View File

@ -500,3 +500,12 @@ def infer_credential_org_from_team(apps, schema_editor):
_update_credential_parents(cred.deprecated_team.organization, cred)
except IntegrityError:
logger.info("Organization<{}> credential for old Team<{}> credential already created".format(cred.deprecated_team.organization.pk, cred.pk))
def delete_all_user_roles(apps, schema_editor):
ContentType = apps.get_model('contenttypes', "ContentType")
Role = apps.get_model('main', "Role")
User = apps.get_model('auth', "User")
user_content_type = ContentType.objects.get_for_model(User)
for role in Role.objects.filter(content_type=user_content_type).iterator():
role.delete()

View File

@ -56,7 +56,6 @@ User.add_to_class('get_queryset', get_user_queryset)
User.add_to_class('can_access', check_user_access)
User.add_to_class('can_access_with_errors', check_user_access_with_errors)
User.add_to_class('accessible_objects', user_accessible_objects)
User.add_to_class('admin_role', user_admin_role)
@property

View File

@ -172,46 +172,6 @@ def sync_superuser_status_to_rbac(instance, **kwargs):
Role.singleton(ROLE_SINGLETON_SYSTEM_ADMINISTRATOR).members.remove(instance)
def create_user_role(instance, **kwargs):
if not kwargs.get('created', True):
return
try:
Role.objects.get(
content_type=ContentType.objects.get_for_model(instance),
object_id=instance.id,
role_field='admin_role'
)
except Role.DoesNotExist:
role = Role.objects.create(
role_field='admin_role',
content_object = instance,
)
role.members.add(instance)
def delete_user_role(instance, **kwargs):
if instance and instance.admin_role:
instance.admin_role.delete()
else:
logger.info(six.text_type("Could not delete the admin role for user {}").format(instance))
def org_admin_edit_members(instance, action, model, reverse, pk_set, **kwargs):
content_type = ContentType.objects.get_for_model(Organization)
if reverse:
return
else:
if instance.content_type == content_type and \
instance.content_object.member_role.id == instance.id:
items = model.objects.filter(pk__in=pk_set).all()
for user in items:
if action == 'post_add':
instance.content_object.admin_role.children.add(user.admin_role)
if action == 'pre_remove':
instance.content_object.admin_role.children.remove(user.admin_role)
def rbac_activity_stream(instance, sender, **kwargs):
user_type = ContentType.objects.get_for_model(User)
# Only if we are associating/disassociating
@ -290,12 +250,9 @@ post_save.connect(emit_project_update_event_detail, sender=ProjectUpdateEvent)
post_save.connect(emit_inventory_update_event_detail, sender=InventoryUpdateEvent)
post_save.connect(emit_system_job_event_detail, sender=SystemJobEvent)
m2m_changed.connect(rebuild_role_ancestor_list, Role.parents.through)
m2m_changed.connect(org_admin_edit_members, Role.members.through)
m2m_changed.connect(rbac_activity_stream, Role.members.through)
m2m_changed.connect(rbac_activity_stream, Role.parents.through)
post_save.connect(sync_superuser_status_to_rbac, sender=User)
post_save.connect(create_user_role, sender=User)
pre_delete.connect(delete_user_role, sender=User)
pre_delete.connect(cleanup_detached_labels_on_deleted_parent, sender=UnifiedJob)
pre_delete.connect(cleanup_detached_labels_on_deleted_parent, sender=UnifiedJobTemplate)

View File

@ -196,12 +196,6 @@ class TestAccessListCapabilities:
direct_access_list = response.data['results'][0]['summary_fields']['direct_access']
assert direct_access_list[0]['role']['user_capabilities']['unattach'] == 'foobar'
def test_user_access_list_direct_access_capability(self, rando, get):
"When a user views their own access list, they cannot unattach their admin role"
response = get(reverse('api:user_access_list', kwargs={'pk': rando.id}), rando)
direct_access_list = response.data['results'][0]['summary_fields']['direct_access']
assert not direct_access_list[0]['role']['user_capabilities']['unattach']
@pytest.mark.django_db
def test_team_roles_unattach(mocker, team, team_member, inventory, mock_access_method, get):

View File

@ -56,7 +56,6 @@ def test_get_roles_list_user(organization, inventory, team, get, user):
assert Role.singleton(ROLE_SINGLETON_SYSTEM_ADMINISTRATOR).id in role_hash
assert organization.admin_role.id in role_hash
assert organization.member_role.id in role_hash
assert this_user.admin_role.id in role_hash
assert custom_role.id in role_hash
assert inventory.admin_role.id not in role_hash
@ -99,12 +98,12 @@ def test_cant_create_role(post, admin):
@pytest.mark.django_db
def test_cant_delete_role(delete, admin):
def test_cant_delete_role(delete, admin, inventory):
"Ensure we can't delete roles through the api"
# Some day we might want to do this, but until that is speced out, lets
# ensure we don't slip up and allow this implicitly through some helper or
# another
response = delete(reverse('api:role_detail', kwargs={'pk': admin.admin_role.id}), admin)
response = delete(reverse('api:role_detail', kwargs={'pk': inventory.admin_role.id}), admin)
assert response.status_code == 405

View File

@ -1,10 +1,9 @@
import pytest
from django.test import TransactionTestCase
from django.contrib.contenttypes.models import ContentType
from awx.main.access import UserAccess
from awx.main.models import User, Organization, Inventory, Role
from awx.main.models import User, Organization, Inventory
@pytest.mark.django_db
@ -62,66 +61,21 @@ def test_user_queryset(user):
@pytest.mark.django_db
def test_user_accessible_objects(user, organization):
'''
We cannot directly use accessible_objects for User model because
both editing and read permissions are obligated to complex business logic
'''
admin = user('admin', False)
u = user('john', False)
assert User.accessible_objects(admin, 'admin_role').count() == 1
access = UserAccess(admin)
assert access.get_queryset().count() == 1 # can only see himself
organization.member_role.members.add(u)
organization.admin_role.members.add(admin)
assert User.accessible_objects(admin, 'admin_role').count() == 2
organization.member_role.members.add(admin)
assert access.get_queryset().count() == 2
organization.member_role.members.remove(u)
assert User.accessible_objects(admin, 'admin_role').count() == 1
@pytest.mark.django_db
def test_org_user_admin(user, organization):
admin = user('orgadmin')
member = user('orgmember')
organization.member_role.members.add(member)
assert admin not in member.admin_role
organization.admin_role.members.add(admin)
assert admin in member.admin_role
organization.admin_role.members.remove(admin)
assert admin not in member.admin_role
@pytest.mark.django_db
def test_org_user_removed(user, organization):
admin = user('orgadmin')
member = user('orgmember')
organization.admin_role.members.add(admin)
organization.member_role.members.add(member)
assert admin in member.admin_role
organization.member_role.members.remove(member)
assert admin not in member.admin_role
@pytest.mark.django_db
def test_create_user_role(rando):
assert Role.objects.filter(
role_field='admin_role',
content_type=ContentType.objects.get_for_model(User),
object_id=rando.id
).count() == 1
assert rando in rando.admin_role
@pytest.mark.django_db
def test_user_role_deleted(rando):
rando_id = rando.id
rando.delete()
assert not Role.objects.filter(
role_field='admin_role',
content_type=ContentType.objects.get_for_model(User),
object_id=rando_id
)
assert access.get_queryset().count() == 1
@pytest.mark.django_db

View File

@ -1,7 +1,4 @@
import mock
from mock import PropertyMock
import pytest
from rest_framework.test import APIRequestFactory
from rest_framework.test import force_authenticate
@ -9,8 +6,6 @@ from rest_framework.test import force_authenticate
from django.contrib.contenttypes.models import ContentType
from awx.api.views import (
RoleUsersList,
UserRolesList,
TeamRolesList,
)
@ -20,69 +15,6 @@ from awx.main.models import (
)
@pytest.mark.parametrize("pk, err", [
(111, "not change the membership"),
(1, "may not perform"),
])
def test_user_roles_list_user_admin_role(pk, err):
with mock.patch('awx.api.views.get_object_or_400') as role_get, \
mock.patch('awx.api.views.ContentType.objects.get_for_model') as ct_get:
role_mock = mock.MagicMock(spec=Role, id=1, pk=1)
content_type_mock = mock.MagicMock(spec=ContentType)
role_mock.content_type = content_type_mock
role_get.return_value = role_mock
ct_get.return_value = content_type_mock
with mock.patch('awx.api.views.User.admin_role', new_callable=PropertyMock, return_value=role_mock):
factory = APIRequestFactory()
view = UserRolesList.as_view()
user = User(username="root", is_superuser=True, pk=1, id=1)
request = factory.post("/user/1/roles", {'id':pk}, format="json")
force_authenticate(request, user)
response = view(request, pk=user.pk)
response.render()
assert response.status_code == 403
assert err in response.content
@pytest.mark.parametrize("admin_role, err", [
(True, "may not perform"),
(False, "not change the membership"),
])
def test_role_users_list_other_user_admin_role(admin_role, err):
with mock.patch('awx.api.views.RoleUsersList.get_parent_object') as role_get, \
mock.patch('awx.api.views.ContentType.objects.get_for_model') as ct_get:
role_mock = mock.MagicMock(spec=Role, id=1)
content_type_mock = mock.MagicMock(spec=ContentType)
role_mock.content_type = content_type_mock
role_get.return_value = role_mock
ct_get.return_value = content_type_mock
user_admin_role = role_mock if admin_role else None
with mock.patch('awx.api.views.User.admin_role', new_callable=PropertyMock, return_value=user_admin_role):
factory = APIRequestFactory()
view = RoleUsersList.as_view()
user = User(username="root", is_superuser=True, pk=1, id=1)
queried_user = User(username="maynard")
request = factory.post("/role/1/users", {'id':1}, format="json")
force_authenticate(request, user)
with mock.patch('awx.api.views.get_object_or_400', return_value=queried_user):
response = view(request)
response.render()
assert response.status_code == 403
assert err in response.content
def test_team_roles_list_post_org_roles():
with mock.patch('awx.api.views.get_object_or_400') as role_get, \
mock.patch('awx.api.views.ContentType.objects.get_for_model') as ct_get: