mirror of
https://github.com/ansible/awx.git
synced 2024-11-01 08:21:15 +03:00
Replaced our 'Resource' table with a GenericForeignKey in RolePermission
This commit is contained in:
parent
e9c3d98a44
commit
9aae2979d9
@ -16,6 +16,7 @@ import yaml
|
||||
from django.conf import settings
|
||||
from django.contrib.auth import authenticate
|
||||
from django.contrib.auth.models import User
|
||||
from django.contrib.contenttypes.models import ContentType
|
||||
from django.core.urlresolvers import reverse
|
||||
from django.core.exceptions import ObjectDoesNotExist, ValidationError as DjangoValidationError
|
||||
from django.db import models
|
||||
@ -293,8 +294,8 @@ class BaseSerializer(serializers.ModelSerializer):
|
||||
if getattr(obj, 'modified_by', None) and obj.modified_by.is_active:
|
||||
res['modified_by'] = reverse('api:user_detail', args=(obj.modified_by.pk,))
|
||||
if isinstance(obj, ResourceMixin):
|
||||
res['resource'] = reverse('api:resource_detail', args=(obj.resource_id,))
|
||||
res['resource_access_list'] = reverse('api:resource_access_list', args=(obj.resource_id,))
|
||||
content_type_id = ContentType.objects.get_for_model(obj).pk
|
||||
res['resource_access_list'] = reverse('api:resource_access_list', kwargs={'content_type_id': content_type_id, 'pk': obj.pk})
|
||||
return res
|
||||
|
||||
def _get_summary_fields(self, obj):
|
||||
@ -366,8 +367,8 @@ class BaseSerializer(serializers.ModelSerializer):
|
||||
return summary_fields
|
||||
|
||||
def get_resource_id(self, obj):
|
||||
if isinstance(obj, ResourceMixin):
|
||||
return obj.resource.id
|
||||
content_type_id = ContentType.objects.get_for_model(obj).pk
|
||||
return '%d/%d' % (content_type_id, obj.pk)
|
||||
return None
|
||||
|
||||
def get_created(self, obj):
|
||||
@ -1508,6 +1509,7 @@ class RoleSerializer(BaseSerializer):
|
||||
return ret
|
||||
|
||||
|
||||
"""
|
||||
class ResourceSerializer(BaseSerializer):
|
||||
|
||||
class Meta:
|
||||
@ -1529,16 +1531,19 @@ class ResourceSerializer(BaseSerializer):
|
||||
|
||||
return ret
|
||||
|
||||
"""
|
||||
|
||||
class ResourceAccessListElementSerializer(UserSerializer):
|
||||
|
||||
def to_representation(self, user):
|
||||
ret = super(ResourceAccessListElementSerializer, self).to_representation(user)
|
||||
resource_id = self.context['view'].resource_id
|
||||
resource = Resource.objects.get(pk=resource_id)
|
||||
content_type = ContentType.objects.get(pk=self.context['view'].content_type_id)
|
||||
object_id = self.context['view'].object_id
|
||||
obj = content_type.model_class().objects.get(pk=object_id)
|
||||
|
||||
if 'summary_fields' not in ret:
|
||||
ret['summary_fields'] = {}
|
||||
ret['summary_fields']['permissions'] = resource.get_permissions(user)
|
||||
ret['summary_fields']['permissions'] = get_user_permissions_on_resource(obj, user)
|
||||
|
||||
def format_role_perm(role):
|
||||
role_dict = { 'id': role.id, 'name': role.name, 'description': role.description}
|
||||
@ -1549,13 +1554,14 @@ class ResourceAccessListElementSerializer(UserSerializer):
|
||||
except:
|
||||
pass
|
||||
|
||||
return { 'role': role_dict, 'permissions': resource.get_role_permissions(role)}
|
||||
return { 'role': role_dict, 'permissions': get_role_permissions_on_resource(obj, role)}
|
||||
|
||||
direct_permissive_role_ids = resource.permissions.values_list('role__id')
|
||||
content_type = ContentType.objects.get_for_model(obj)
|
||||
direct_permissive_role_ids = RolePermission.objects.filter(content_type=content_type, object_id=obj.id).values_list('role__id')
|
||||
direct_access_roles = user.roles.filter(id__in=direct_permissive_role_ids).all()
|
||||
ret['summary_fields']['direct_access'] = [format_role_perm(r) for r in direct_access_roles]
|
||||
|
||||
all_permissive_role_ids = resource.permissions.values_list('role__ancestors__id')
|
||||
all_permissive_role_ids = RolePermission.objects.filter(content_type=content_type, object_id=obj.id).values_list('role__ancestors__id')
|
||||
indirect_access_roles = user.roles.filter(id__in=all_permissive_role_ids).exclude(id__in=direct_permissive_role_ids).all()
|
||||
ret['summary_fields']['indirect_access'] = [format_role_perm(r) for r in indirect_access_roles]
|
||||
return ret
|
||||
|
@ -163,9 +163,9 @@ role_urls = patterns('awx.api.views',
|
||||
)
|
||||
|
||||
resource_urls = patterns('awx.api.views',
|
||||
url(r'^$', 'resource_list'),
|
||||
url(r'^(?P<pk>[0-9]+)/$', 'resource_detail'),
|
||||
url(r'^(?P<pk>[0-9]+)/access_list/$', 'resource_access_list'),
|
||||
#url(r'^$', 'resource_list'),
|
||||
#url(r'^(?P<pk>[0-9]+)/$', 'resource_detail'),
|
||||
url(r'^(?P<content_type_id>[0-9]+)/(?P<pk>[0-9]+)/access_list/$', 'resource_access_list'),
|
||||
#url(r'^(?P<pk>[0-9]+)/users/$', 'resource_users_list'),
|
||||
#url(r'^(?P<pk>[0-9]+)/teams/$', 'resource_teams_list'),
|
||||
#url(r'^(?P<pk>[0-9]+)/roles/$', 'resource_teams_list'),
|
||||
|
@ -131,7 +131,6 @@ class ApiV1RootView(APIView):
|
||||
data['system_jobs'] = reverse('api:system_job_list')
|
||||
data['schedules'] = reverse('api:schedule_list')
|
||||
data['roles'] = reverse('api:role_list')
|
||||
data['resources'] = reverse('api:resource_list')
|
||||
data['notifiers'] = reverse('api:notifier_list')
|
||||
data['notifications'] = reverse('api:notification_list')
|
||||
data['unified_job_templates'] = reverse('api:unified_job_template_list')
|
||||
@ -3269,6 +3268,7 @@ class RoleChildrenList(SubListAPIView):
|
||||
role = Role.objects.get(pk=self.kwargs['pk'])
|
||||
return role.children
|
||||
|
||||
'''
|
||||
class ResourceDetail(RetrieveAPIView):
|
||||
|
||||
model = Resource
|
||||
@ -3290,6 +3290,8 @@ class ResourceList(ListAPIView):
|
||||
def get_queryset(self):
|
||||
return Resource.objects.filter(permissions__role__ancestors__members=self.request.user)
|
||||
|
||||
'''
|
||||
|
||||
class ResourceAccessList(ListAPIView):
|
||||
|
||||
model = User
|
||||
@ -3298,9 +3300,13 @@ class ResourceAccessList(ListAPIView):
|
||||
new_in_300 = True
|
||||
|
||||
def get_queryset(self):
|
||||
self.resource_id = self.kwargs['pk']
|
||||
resource = Resource.objects.get(pk=self.kwargs['pk'])
|
||||
roles = set([p.role for p in resource.permissions.all()])
|
||||
self.content_type_id = self.kwargs['content_type_id']
|
||||
self.object_id = self.kwargs['pk']
|
||||
#resource = Resource.objects.get(pk=self.kwargs['pk'])
|
||||
content_type = ContentType.objects.get(pk=self.content_type_id)
|
||||
obj = content_type.model_class().objects.get(pk=self.object_id)
|
||||
|
||||
roles = set([p.role for p in obj.role_permissions.all()])
|
||||
ancestors = set()
|
||||
for r in roles:
|
||||
ancestors.update(set(r.ancestors.all()))
|
||||
|
@ -1722,34 +1722,6 @@ class RoleAccess(BaseAccess):
|
||||
return False
|
||||
|
||||
|
||||
class ResourceAccess(BaseAccess):
|
||||
'''
|
||||
TODO: XXX: Needs implemenation
|
||||
'''
|
||||
|
||||
model = Role
|
||||
|
||||
def get_queryset(self):
|
||||
if self.user.is_superuser:
|
||||
return self.model.objects.all()
|
||||
return self.model.objects.none()
|
||||
|
||||
def can_change(self, obj, data):
|
||||
return self.user.is_superuser
|
||||
|
||||
def can_add(self, obj, data):
|
||||
return self.user.is_superuser
|
||||
|
||||
def can_attach(self, obj, sub_obj, relationship, data,
|
||||
skip_sub_obj_read_check=False):
|
||||
return self.user.is_superuser
|
||||
|
||||
def can_unattach(self, obj, sub_obj, relationship):
|
||||
return self.user.is_superuser
|
||||
|
||||
def can_delete(self, obj):
|
||||
return self.user.is_superuser
|
||||
|
||||
register_access(User, UserAccess)
|
||||
register_access(Organization, OrganizationAccess)
|
||||
register_access(Inventory, InventoryAccess)
|
||||
@ -1777,6 +1749,5 @@ register_access(ActivityStream, ActivityStreamAccess)
|
||||
register_access(CustomInventoryScript, CustomInventoryScriptAccess)
|
||||
register_access(TowerSettings, TowerSettingsAccess)
|
||||
register_access(Role, RoleAccess)
|
||||
register_access(Resource, ResourceAccess)
|
||||
register_access(Notifier, NotifierAccess)
|
||||
register_access(Notification, NotificationAccess)
|
||||
|
@ -1,6 +1,8 @@
|
||||
# Copyright (c) 2015 Ansible, Inc.
|
||||
# All Rights Reserved.
|
||||
|
||||
import traceback
|
||||
|
||||
# Django
|
||||
from django.db import connection
|
||||
from django.db.models.signals import (
|
||||
@ -23,10 +25,10 @@ from django.db.transaction import TransactionManagementError
|
||||
|
||||
|
||||
# AWX
|
||||
from awx.main.models.rbac import Resource, RolePermission, Role
|
||||
from awx.main.models.rbac import RolePermission, Role
|
||||
|
||||
|
||||
__all__ = ['AutoOneToOneField', 'ImplicitResourceField', 'ImplicitRoleField']
|
||||
__all__ = ['AutoOneToOneField', 'ImplicitRoleField']
|
||||
|
||||
|
||||
# Based on AutoOneToOneField from django-annoying:
|
||||
@ -59,53 +61,6 @@ class AutoOneToOneField(models.OneToOneField):
|
||||
|
||||
|
||||
|
||||
class ResourceFieldDescriptor(ReverseSingleRelatedObjectDescriptor):
|
||||
"""Descriptor for access to the object from its related class."""
|
||||
|
||||
def __init__(self, *args, **kwargs):
|
||||
super(ResourceFieldDescriptor, self).__init__(*args, **kwargs)
|
||||
|
||||
def __get__(self, instance, instance_type=None):
|
||||
resource = super(ResourceFieldDescriptor, self).__get__(instance, instance_type)
|
||||
if resource:
|
||||
return resource
|
||||
if connection.needs_rollback:
|
||||
raise TransactionManagementError('Current transaction has failed, cannot create implicit resource')
|
||||
resource = Resource.objects.create(content_object=instance)
|
||||
setattr(instance, self.field.name, resource)
|
||||
if instance.pk:
|
||||
instance.save(update_fields=[self.field.name,])
|
||||
return resource
|
||||
|
||||
|
||||
class ImplicitResourceField(models.ForeignKey):
|
||||
"""Creates an associated resource object if one doesn't already exist"""
|
||||
|
||||
def __init__(self, *args, **kwargs):
|
||||
kwargs.setdefault('to', 'Resource')
|
||||
kwargs.setdefault('related_name', '+')
|
||||
kwargs.setdefault('null', 'True')
|
||||
super(ImplicitResourceField, self).__init__(*args, **kwargs)
|
||||
|
||||
def contribute_to_class(self, cls, name):
|
||||
super(ImplicitResourceField, self).contribute_to_class(cls, name)
|
||||
setattr(cls, self.name, ResourceFieldDescriptor(self))
|
||||
post_save.connect(self._post_save, cls, True)
|
||||
post_delete.connect(self._post_delete, cls, True)
|
||||
|
||||
def _post_save(self, instance, *args, **kwargs):
|
||||
# Ensures our resource object exists and that it's content_object
|
||||
# points back to our hosting instance.
|
||||
this_resource = getattr(instance, self.name)
|
||||
if not this_resource.object_id:
|
||||
this_resource.content_object = instance
|
||||
this_resource.save()
|
||||
|
||||
def _post_delete(self, instance, *args, **kwargs):
|
||||
getattr(instance, self.name).delete()
|
||||
|
||||
|
||||
|
||||
|
||||
def resolve_role_field(obj, field):
|
||||
ret = []
|
||||
@ -153,9 +108,13 @@ class ImplicitRoleDescriptor(ReverseSingleRelatedObjectDescriptor):
|
||||
if connection.needs_rollback:
|
||||
raise TransactionManagementError('Current transaction has failed, cannot create implicit role')
|
||||
|
||||
role = Role.objects.create(name=self.role_name, description=self.role_description, content_object=instance)
|
||||
if self.parent_role:
|
||||
|
||||
role = Role.objects.create(name=self.role_name, description=self.role_description, content_object=instance)
|
||||
setattr(instance, self.field.name, role)
|
||||
if instance.pk:
|
||||
instance.save(update_fields=[self.field.name,])
|
||||
|
||||
if self.parent_role:
|
||||
# Add all non-null parent roles as parents
|
||||
paths = self.parent_role if type(self.parent_role) is list else [self.parent_role]
|
||||
for path in paths:
|
||||
@ -165,14 +124,11 @@ class ImplicitRoleDescriptor(ReverseSingleRelatedObjectDescriptor):
|
||||
parents = resolve_role_field(instance, path)
|
||||
for parent in parents:
|
||||
role.parents.add(parent)
|
||||
setattr(instance, self.field.name, role)
|
||||
if instance.pk:
|
||||
instance.save(update_fields=[self.field.name,])
|
||||
|
||||
if self.permissions is not None:
|
||||
permissions = RolePermission(
|
||||
role=role,
|
||||
resource=instance.resource
|
||||
resource=instance
|
||||
)
|
||||
|
||||
if 'all' in self.permissions and self.permissions['all']:
|
||||
@ -289,48 +245,29 @@ class ImplicitRoleField(models.ForeignKey):
|
||||
def _post_init(self, instance, *args, **kwargs):
|
||||
if not self.parent_role:
|
||||
return
|
||||
#if not hasattr(instance, self.name):
|
||||
# getattr(instance, self.name)
|
||||
|
||||
if not instance.pk:
|
||||
return
|
||||
|
||||
self._calc_original_parents(instance)
|
||||
|
||||
def _calc_original_parents(self, instance):
|
||||
if not hasattr(self, '__original_parent_roles'):
|
||||
setattr(self, '__original_parent_roles', []) # do not just self.__original_parent_roles=[], it's not the same here
|
||||
paths = self.parent_role if type(self.parent_role) is list else [self.parent_role]
|
||||
all_parents = set()
|
||||
original_parent_roles = set()
|
||||
for path in paths:
|
||||
if path.startswith("singleton:"):
|
||||
parents = [Role.singleton(path[10:])]
|
||||
else:
|
||||
parents = resolve_role_field(instance, path)
|
||||
for parent in parents:
|
||||
all_parents.add(parent)
|
||||
#role.parents.add(parent)
|
||||
self.__original_parent_roles = all_parents
|
||||
original_parent_roles.add(parent)
|
||||
setattr(self, '__original_parent_roles', original_parent_roles)
|
||||
|
||||
'''
|
||||
field_names = self.parent_role
|
||||
if type(field_names) is not list:
|
||||
field_names = [field_names]
|
||||
self.__original_values = {}
|
||||
for field_name in field_names:
|
||||
if field_name.startswith('singleton:'):
|
||||
continue
|
||||
first_field_name = field_name.split('.')[0]
|
||||
self.__original_values[first_field_name] = getattr(instance, first_field_name)
|
||||
'''
|
||||
else:
|
||||
print('WE DO NEED THIS')
|
||||
pass
|
||||
|
||||
def _post_save(self, instance, *args, **kwargs):
|
||||
def _post_save(self, instance, created, *args, **kwargs):
|
||||
# Ensure that our field gets initialized after our first save
|
||||
this_role = getattr(instance, self.name)
|
||||
if not this_role.object_id:
|
||||
# Ensure our ref back to our instance is set. This will not be set the
|
||||
# first time the object is saved because we create the role in our _post_init
|
||||
# but that happens before an id for the instance has been set (because it
|
||||
# hasn't been saved yet!). Now that everything has an id, we patch things
|
||||
# so the role references the instance.
|
||||
this_role.content_object = instance
|
||||
this_role.save()
|
||||
|
||||
# As object relations change, the role hierarchy might also change if the relations
|
||||
# that changed were referenced in our magic parent_role field. This code synchronizes
|
||||
@ -338,8 +275,12 @@ class ImplicitRoleField(models.ForeignKey):
|
||||
if not self.parent_role:
|
||||
return
|
||||
|
||||
if created:
|
||||
self._calc_original_parents(instance)
|
||||
return
|
||||
|
||||
paths = self.parent_role if type(self.parent_role) is list else [self.parent_role]
|
||||
original_parents = self.__original_parent_roles
|
||||
original_parents = getattr(self, '__original_parent_roles')
|
||||
new_parents = set()
|
||||
for path in paths:
|
||||
if path.startswith("singleton:"):
|
||||
@ -356,7 +297,7 @@ class ImplicitRoleField(models.ForeignKey):
|
||||
this_role.parents.add(role)
|
||||
Role.unpause_role_ancestor_rebuilding()
|
||||
|
||||
self.__original_parent_roles = new_parents
|
||||
setattr(self, '__original_parent_roles', new_parents)
|
||||
|
||||
def _post_delete(self, instance, *args, **kwargs):
|
||||
this_role = getattr(instance, self.name)
|
||||
|
@ -18,26 +18,6 @@ class Migration(migrations.Migration):
|
||||
]
|
||||
|
||||
operations = [
|
||||
migrations.CreateModel(
|
||||
name='Resource',
|
||||
fields=[
|
||||
('id', models.AutoField(verbose_name='ID', serialize=False, auto_created=True, primary_key=True)),
|
||||
('created', models.DateTimeField(default=None, editable=False)),
|
||||
('modified', models.DateTimeField(default=None, editable=False)),
|
||||
('description', models.TextField(default=b'', blank=True)),
|
||||
('active', models.BooleanField(default=True, editable=False)),
|
||||
('name', models.CharField(max_length=512)),
|
||||
('object_id', models.PositiveIntegerField(default=None, null=True)),
|
||||
('content_type', models.ForeignKey(default=None, to='contenttypes.ContentType', null=True)),
|
||||
('created_by', models.ForeignKey(related_name="{u'class': 'resource', u'app_label': 'main'}(class)s_created+", on_delete=django.db.models.deletion.SET_NULL, default=None, editable=False, to=settings.AUTH_USER_MODEL, null=True)),
|
||||
('modified_by', models.ForeignKey(related_name="{u'class': 'resource', u'app_label': 'main'}(class)s_modified+", on_delete=django.db.models.deletion.SET_NULL, default=None, editable=False, to=settings.AUTH_USER_MODEL, null=True)),
|
||||
('tags', taggit.managers.TaggableManager(to='taggit.Tag', through='taggit.TaggedItem', blank=True, help_text='A comma-separated list of tags.', verbose_name='Tags')),
|
||||
],
|
||||
options={
|
||||
'db_table': 'main_rbac_resources',
|
||||
'verbose_name_plural': 'resources',
|
||||
},
|
||||
),
|
||||
migrations.CreateModel(
|
||||
name='Role',
|
||||
fields=[
|
||||
@ -68,6 +48,7 @@ class Migration(migrations.Migration):
|
||||
('id', models.AutoField(verbose_name='ID', serialize=False, auto_created=True, primary_key=True)),
|
||||
('created', models.DateTimeField(default=None, editable=False)),
|
||||
('modified', models.DateTimeField(default=None, editable=False)),
|
||||
('object_id', models.PositiveIntegerField(default=None)),
|
||||
('create', models.IntegerField(default=0)),
|
||||
('read', models.IntegerField(default=0)),
|
||||
('write', models.IntegerField(default=0)),
|
||||
@ -76,7 +57,7 @@ class Migration(migrations.Migration):
|
||||
('execute', models.IntegerField(default=0)),
|
||||
('scm_update', models.IntegerField(default=0)),
|
||||
('use', models.IntegerField(default=0)),
|
||||
('resource', models.ForeignKey(related_name='permissions', to='main.Resource')),
|
||||
('content_type', models.ForeignKey(default=None, to='contenttypes.ContentType')),
|
||||
('role', models.ForeignKey(related_name='permissions', to='main.Role')),
|
||||
],
|
||||
options={
|
||||
@ -84,21 +65,32 @@ class Migration(migrations.Migration):
|
||||
'verbose_name_plural': 'permissions',
|
||||
},
|
||||
),
|
||||
migrations.AlterField(
|
||||
model_name='towersettings',
|
||||
name='value',
|
||||
field=models.TextField(blank=True),
|
||||
migrations.CreateModel(
|
||||
name='UserResource',
|
||||
fields=[
|
||||
('id', models.AutoField(verbose_name='ID', serialize=False, auto_created=True, primary_key=True)),
|
||||
('created', models.DateTimeField(default=None, editable=False)),
|
||||
('modified', models.DateTimeField(default=None, editable=False)),
|
||||
('description', models.TextField(default=b'', blank=True)),
|
||||
('active', models.BooleanField(default=True, editable=False)),
|
||||
('name', models.CharField(max_length=512)),
|
||||
('admin_role', awx.main.fields.ImplicitRoleField(related_name='+', to='main.Role', null=b'True')),
|
||||
('created_by', models.ForeignKey(related_name="{u'class': 'userresource', u'app_label': 'main'}(class)s_created+", on_delete=django.db.models.deletion.SET_NULL, default=None, editable=False, to=settings.AUTH_USER_MODEL, null=True)),
|
||||
('modified_by', models.ForeignKey(related_name="{u'class': 'userresource', u'app_label': 'main'}(class)s_modified+", on_delete=django.db.models.deletion.SET_NULL, default=None, editable=False, to=settings.AUTH_USER_MODEL, null=True)),
|
||||
('tags', taggit.managers.TaggableManager(to='taggit.Tag', through='taggit.TaggedItem', blank=True, help_text='A comma-separated list of tags.', verbose_name='Tags')),
|
||||
('user', awx.main.fields.AutoOneToOneField(related_name='resource', editable=False, to=settings.AUTH_USER_MODEL)),
|
||||
],
|
||||
options={
|
||||
'db_table': 'main_rbac_user_resource',
|
||||
'verbose_name': 'user_resource',
|
||||
'verbose_name_plural': 'user_resources',
|
||||
},
|
||||
),
|
||||
migrations.AddField(
|
||||
model_name='credential',
|
||||
name='owner_role',
|
||||
field=awx.main.fields.ImplicitRoleField(related_name='+', to='main.Role', null=b'True'),
|
||||
),
|
||||
migrations.AddField(
|
||||
model_name='credential',
|
||||
name='resource',
|
||||
field=awx.main.fields.ImplicitResourceField(related_name='+', to='main.Resource', null=b'True'),
|
||||
),
|
||||
migrations.AddField(
|
||||
model_name='credential',
|
||||
name='usage_role',
|
||||
@ -119,21 +111,11 @@ class Migration(migrations.Migration):
|
||||
name='executor_role',
|
||||
field=awx.main.fields.ImplicitRoleField(related_name='+', to='main.Role', null=b'True'),
|
||||
),
|
||||
migrations.AddField(
|
||||
model_name='group',
|
||||
name='resource',
|
||||
field=awx.main.fields.ImplicitResourceField(related_name='+', to='main.Resource', null=b'True'),
|
||||
),
|
||||
migrations.AddField(
|
||||
model_name='group',
|
||||
name='updater_role',
|
||||
field=awx.main.fields.ImplicitRoleField(related_name='+', to='main.Role', null=b'True'),
|
||||
),
|
||||
migrations.AddField(
|
||||
model_name='host',
|
||||
name='resource',
|
||||
field=awx.main.fields.ImplicitResourceField(related_name='+', to='main.Resource', null=b'True'),
|
||||
),
|
||||
migrations.AddField(
|
||||
model_name='inventory',
|
||||
name='admin_role',
|
||||
@ -149,21 +131,11 @@ class Migration(migrations.Migration):
|
||||
name='executor_role',
|
||||
field=awx.main.fields.ImplicitRoleField(related_name='+', to='main.Role', null=b'True'),
|
||||
),
|
||||
migrations.AddField(
|
||||
model_name='inventory',
|
||||
name='resource',
|
||||
field=awx.main.fields.ImplicitResourceField(related_name='+', to='main.Resource', null=b'True'),
|
||||
),
|
||||
migrations.AddField(
|
||||
model_name='inventory',
|
||||
name='updater_role',
|
||||
field=awx.main.fields.ImplicitRoleField(related_name='+', to='main.Role', null=b'True'),
|
||||
),
|
||||
migrations.AddField(
|
||||
model_name='inventorysource',
|
||||
name='resource',
|
||||
field=awx.main.fields.ImplicitResourceField(related_name='+', to='main.Resource', null=b'True'),
|
||||
),
|
||||
migrations.AddField(
|
||||
model_name='jobtemplate',
|
||||
name='admin_role',
|
||||
@ -179,11 +151,6 @@ class Migration(migrations.Migration):
|
||||
name='executor_role',
|
||||
field=awx.main.fields.ImplicitRoleField(related_name='+', to='main.Role', null=b'True'),
|
||||
),
|
||||
migrations.AddField(
|
||||
model_name='jobtemplate',
|
||||
name='resource',
|
||||
field=awx.main.fields.ImplicitResourceField(related_name='+', to='main.Resource', null=b'True'),
|
||||
),
|
||||
migrations.AddField(
|
||||
model_name='organization',
|
||||
name='admin_role',
|
||||
@ -199,11 +166,6 @@ class Migration(migrations.Migration):
|
||||
name='member_role',
|
||||
field=awx.main.fields.ImplicitRoleField(related_name='+', to='main.Role', null=b'True'),
|
||||
),
|
||||
migrations.AddField(
|
||||
model_name='organization',
|
||||
name='resource',
|
||||
field=awx.main.fields.ImplicitResourceField(related_name='+', to='main.Resource', null=b'True'),
|
||||
),
|
||||
migrations.AddField(
|
||||
model_name='project',
|
||||
name='admin_role',
|
||||
@ -219,11 +181,6 @@ class Migration(migrations.Migration):
|
||||
name='member_role',
|
||||
field=awx.main.fields.ImplicitRoleField(related_name='+', to='main.Role', null=b'True'),
|
||||
),
|
||||
migrations.AddField(
|
||||
model_name='project',
|
||||
name='resource',
|
||||
field=awx.main.fields.ImplicitResourceField(related_name='+', to='main.Resource', null=b'True'),
|
||||
),
|
||||
migrations.AddField(
|
||||
model_name='project',
|
||||
name='scm_update_role',
|
||||
@ -244,37 +201,12 @@ class Migration(migrations.Migration):
|
||||
name='member_role',
|
||||
field=awx.main.fields.ImplicitRoleField(related_name='+', to='main.Role', null=b'True'),
|
||||
),
|
||||
migrations.AddField(
|
||||
model_name='team',
|
||||
name='resource',
|
||||
field=awx.main.fields.ImplicitResourceField(related_name='+', to='main.Resource', null=b'True'),
|
||||
),
|
||||
|
||||
migrations.CreateModel(
|
||||
name='UserResource',
|
||||
fields=[
|
||||
('id', models.AutoField(verbose_name='ID', serialize=False, auto_created=True, primary_key=True)),
|
||||
('created', models.DateTimeField(default=None, editable=False)),
|
||||
('modified', models.DateTimeField(default=None, editable=False)),
|
||||
('description', models.TextField(default=b'', blank=True)),
|
||||
('active', models.BooleanField(default=True, editable=False)),
|
||||
('name', models.CharField(max_length=512)),
|
||||
('admin_role', awx.main.fields.ImplicitRoleField(related_name='+', to='main.Role', null=b'True')),
|
||||
('created_by', models.ForeignKey(related_name="{u'class': 'userresource', u'app_label': 'main'}(class)s_created+", on_delete=django.db.models.deletion.SET_NULL, default=None, editable=False, to=settings.AUTH_USER_MODEL, null=True)),
|
||||
('modified_by', models.ForeignKey(related_name="{u'class': 'userresource', u'app_label': 'main'}(class)s_modified+", on_delete=django.db.models.deletion.SET_NULL, default=None, editable=False, to=settings.AUTH_USER_MODEL, null=True)),
|
||||
('resource', awx.main.fields.ImplicitResourceField(related_name='+', to='main.Resource', null=b'True')),
|
||||
('tags', taggit.managers.TaggableManager(to='taggit.Tag', through='taggit.TaggedItem', blank=True, help_text='A comma-separated list of tags.', verbose_name='Tags')),
|
||||
('user', awx.main.fields.AutoOneToOneField(related_name='resource', editable=False, to=settings.AUTH_USER_MODEL)),
|
||||
],
|
||||
options={
|
||||
'db_table': 'main_rbac_user_resource',
|
||||
'verbose_name': 'user_resource',
|
||||
'verbose_name_plural': 'user_resources',
|
||||
},
|
||||
),
|
||||
migrations.AlterUniqueTogether(
|
||||
name='userresource',
|
||||
unique_together=set([('user', 'admin_role')]),
|
||||
),
|
||||
|
||||
migrations.AlterIndexTogether(
|
||||
name='rolepermission',
|
||||
index_together=set([('content_type', 'object_id')]),
|
||||
),
|
||||
]
|
||||
|
@ -1,11 +1,13 @@
|
||||
# Django
|
||||
from django.db import models
|
||||
from django.db.models.aggregates import Max
|
||||
from django.contrib.contenttypes.models import ContentType
|
||||
from django.contrib.contenttypes.fields import GenericRelation
|
||||
|
||||
# AWX
|
||||
from awx.main.models.rbac import Resource
|
||||
from awx.main.fields import ImplicitResourceField
|
||||
from awx.main.models.rbac import (
|
||||
get_user_permissions_on_resource,
|
||||
get_role_permissions_on_resource,
|
||||
)
|
||||
|
||||
|
||||
__all__ = ['ResourceMixin']
|
||||
@ -15,7 +17,7 @@ class ResourceMixin(models.Model):
|
||||
class Meta:
|
||||
abstract = True
|
||||
|
||||
resource = ImplicitResourceField()
|
||||
role_permissions = GenericRelation('main.RolePermission')
|
||||
|
||||
@classmethod
|
||||
def accessible_objects(cls, user, permissions):
|
||||
@ -31,19 +33,46 @@ class ResourceMixin(models.Model):
|
||||
`myresource.get_permissions(user)`.
|
||||
'''
|
||||
|
||||
qs = Resource.objects.filter(
|
||||
content_type=ContentType.objects.get_for_model(cls),
|
||||
permissions__role__ancestors__members=user
|
||||
qs = cls.objects.filter(
|
||||
role_permissions__role__ancestors__members=user
|
||||
)
|
||||
for perm in permissions:
|
||||
qs = qs.annotate(**{'max_' + perm: Max('permissions__' + perm)})
|
||||
qs = qs.annotate(**{'max_' + perm: Max('role_permissions__' + perm)})
|
||||
qs = qs.filter(**{'max_' + perm: int(permissions[perm])})
|
||||
|
||||
return cls.objects.filter(resource__in=qs)
|
||||
#return cls.objects.filter(resource__in=qs)
|
||||
return qs
|
||||
|
||||
|
||||
def get_permissions(self, user):
|
||||
return self.resource.get_permissions(user)
|
||||
'''
|
||||
Returns a dict (or None) of the permissions a user has for a given
|
||||
resource.
|
||||
|
||||
Note: Each field in the dict is the `or` of all respective permissions
|
||||
that have been granted to the roles that are applicable for the given
|
||||
user.
|
||||
|
||||
In example, if a user has been granted read access through a permission
|
||||
on one role and write access through a permission on a separate role,
|
||||
the returned dict will denote that the user has both read and write
|
||||
access.
|
||||
'''
|
||||
|
||||
return get_user_permissions_on_resource(self, user)
|
||||
|
||||
|
||||
def get_role_permissions(self, role):
|
||||
'''
|
||||
Returns a dict (or None) of the permissions a role has for a given
|
||||
resource.
|
||||
|
||||
Note: Each field in the dict is the `or` of all respective permissions
|
||||
that have been granted to either the role or any descendents of that role.
|
||||
'''
|
||||
|
||||
return get_role_permissions_on_resource(self, role)
|
||||
|
||||
|
||||
def accessible_by(self, user, permissions):
|
||||
'''
|
||||
|
@ -19,7 +19,8 @@ from awx.main.models.base import * # noqa
|
||||
__all__ = [
|
||||
'Role',
|
||||
'RolePermission',
|
||||
'Resource',
|
||||
'get_user_permissions_on_resource',
|
||||
'get_role_permissions_on_resource',
|
||||
'ROLE_SINGLETON_SYSTEM_ADMINISTRATOR',
|
||||
'ROLE_SINGLETON_SYSTEM_AUDITOR',
|
||||
]
|
||||
@ -120,26 +121,6 @@ class Role(CommonModelNameNotUnique):
|
||||
for child in self.children.all():
|
||||
child.rebuild_role_ancestor_list()
|
||||
|
||||
def grant(self, resource, permissions):
|
||||
# take either the raw Resource or something that includes the ResourceMixin
|
||||
resource = resource if type(resource) is Resource else resource.resource
|
||||
|
||||
if 'all' in permissions and permissions['all']:
|
||||
del permissions['all']
|
||||
permissions['create'] = True
|
||||
permissions['read'] = True
|
||||
permissions['write'] = True
|
||||
permissions['update'] = True
|
||||
permissions['delete'] = True
|
||||
permissions['scm_update'] = True
|
||||
permissions['use'] = True
|
||||
permissions['execute'] = True
|
||||
|
||||
permission = RolePermission(role=self, resource=resource)
|
||||
for k in permissions:
|
||||
setattr(permission, k, int(permissions[k]))
|
||||
permission.save()
|
||||
|
||||
@staticmethod
|
||||
def visible_roles(user):
|
||||
return Role.objects.filter(Q(descendents__in=user.roles.filter()) | Q(ancestors__in=user.roles.filter()))
|
||||
@ -149,14 +130,14 @@ class Role(CommonModelNameNotUnique):
|
||||
try:
|
||||
return Role.objects.get(singleton_name=name)
|
||||
except Role.DoesNotExist:
|
||||
ret = Role(singleton_name=name, name=name)
|
||||
ret.save()
|
||||
ret = Role.objects.create(singleton_name=name, name=name)
|
||||
return ret
|
||||
|
||||
def is_ancestor_of(self, role):
|
||||
return role.ancestors.filter(id=self.id).exists()
|
||||
|
||||
|
||||
"""
|
||||
class Resource(CommonModelNameNotUnique):
|
||||
'''
|
||||
Role model
|
||||
@ -171,69 +152,7 @@ class Resource(CommonModelNameNotUnique):
|
||||
object_id = models.PositiveIntegerField(null=True, default=None)
|
||||
content_object = GenericForeignKey('content_type', 'object_id')
|
||||
|
||||
def get_permissions(self, user):
|
||||
'''
|
||||
Returns a dict (or None) of the permissions a user has for a given
|
||||
resource.
|
||||
|
||||
Note: Each field in the dict is the `or` of all respective permissions
|
||||
that have been granted to the roles that are applicable for the given
|
||||
user.
|
||||
|
||||
In example, if a user has been granted read access through a permission
|
||||
on one role and write access through a permission on a separate role,
|
||||
the returned dict will denote that the user has both read and write
|
||||
access.
|
||||
'''
|
||||
|
||||
qs = user.__class__.objects.filter(id=user.id, roles__descendents__permissions__resource=self)
|
||||
|
||||
qs = qs.annotate(max_create = Max('roles__descendents__permissions__create'))
|
||||
qs = qs.annotate(max_read = Max('roles__descendents__permissions__read'))
|
||||
qs = qs.annotate(max_write = Max('roles__descendents__permissions__write'))
|
||||
qs = qs.annotate(max_update = Max('roles__descendents__permissions__update'))
|
||||
qs = qs.annotate(max_delete = Max('roles__descendents__permissions__delete'))
|
||||
qs = qs.annotate(max_scm_update = Max('roles__descendents__permissions__scm_update'))
|
||||
qs = qs.annotate(max_execute = Max('roles__descendents__permissions__execute'))
|
||||
qs = qs.annotate(max_use = Max('roles__descendents__permissions__use'))
|
||||
|
||||
qs = qs.values('max_create', 'max_read', 'max_write', 'max_update',
|
||||
'max_delete', 'max_scm_update', 'max_execute', 'max_use')
|
||||
|
||||
res = qs.all()
|
||||
if len(res):
|
||||
# strip away the 'max_' prefix
|
||||
return {k[4:]:v for k,v in res[0].items()}
|
||||
return None
|
||||
|
||||
def get_role_permissions(self, role):
|
||||
'''
|
||||
Returns a dict (or None) of the permissions a role has for a given
|
||||
resource.
|
||||
|
||||
Note: Each field in the dict is the `or` of all respective permissions
|
||||
that have been granted to either the role or any descendents of that role.
|
||||
'''
|
||||
|
||||
qs = Role.objects.filter(id=role.id, descendents__permissions__resource=self)
|
||||
|
||||
qs = qs.annotate(max_create = Max('descendents__permissions__create'))
|
||||
qs = qs.annotate(max_read = Max('descendents__permissions__read'))
|
||||
qs = qs.annotate(max_write = Max('descendents__permissions__write'))
|
||||
qs = qs.annotate(max_update = Max('descendents__permissions__update'))
|
||||
qs = qs.annotate(max_delete = Max('descendents__permissions__delete'))
|
||||
qs = qs.annotate(max_scm_update = Max('descendents__permissions__scm_update'))
|
||||
qs = qs.annotate(max_execute = Max('descendents__permissions__execute'))
|
||||
qs = qs.annotate(max_use = Max('descendents__permissions__use'))
|
||||
|
||||
qs = qs.values('max_create', 'max_read', 'max_write', 'max_update',
|
||||
'max_delete', 'max_scm_update', 'max_execute', 'max_use')
|
||||
|
||||
res = qs.all()
|
||||
if len(res):
|
||||
# strip away the 'max_' prefix
|
||||
return {k[4:]:v for k,v in res[0].items()}
|
||||
return None
|
||||
"""
|
||||
|
||||
|
||||
class RolePermission(CreatedModifiedModel):
|
||||
@ -245,6 +164,9 @@ class RolePermission(CreatedModifiedModel):
|
||||
app_label = 'main'
|
||||
verbose_name_plural = _('permissions')
|
||||
db_table = 'main_rbac_permissions'
|
||||
index_together = [
|
||||
('content_type', 'object_id')
|
||||
]
|
||||
|
||||
role = models.ForeignKey(
|
||||
Role,
|
||||
@ -252,12 +174,10 @@ class RolePermission(CreatedModifiedModel):
|
||||
on_delete=models.CASCADE,
|
||||
related_name='permissions',
|
||||
)
|
||||
resource = models.ForeignKey(
|
||||
Resource,
|
||||
null=False,
|
||||
on_delete=models.CASCADE,
|
||||
related_name='permissions',
|
||||
)
|
||||
content_type = models.ForeignKey(ContentType, null=False, default=None)
|
||||
object_id = models.PositiveIntegerField(null=False, default=None)
|
||||
resource = GenericForeignKey('content_type', 'object_id')
|
||||
|
||||
create = models.IntegerField(default = 0)
|
||||
read = models.IntegerField(default = 0)
|
||||
write = models.IntegerField(default = 0)
|
||||
@ -266,3 +186,69 @@ class RolePermission(CreatedModifiedModel):
|
||||
execute = models.IntegerField(default = 0)
|
||||
scm_update = models.IntegerField(default = 0)
|
||||
use = models.IntegerField(default = 0)
|
||||
|
||||
|
||||
|
||||
def get_user_permissions_on_resource(resource, user):
|
||||
'''
|
||||
Returns a dict (or None) of the permissions a user has for a given
|
||||
resource.
|
||||
|
||||
Note: Each field in the dict is the `or` of all respective permissions
|
||||
that have been granted to the roles that are applicable for the given
|
||||
user.
|
||||
|
||||
In example, if a user has been granted read access through a permission
|
||||
on one role and write access through a permission on a separate role,
|
||||
the returned dict will denote that the user has both read and write
|
||||
access.
|
||||
'''
|
||||
|
||||
qs = RolePermission.objects.filter(
|
||||
content_type=ContentType.objects.get_for_model(resource),
|
||||
object_id=resource.id,
|
||||
role__ancestors__in=user.roles.all()
|
||||
)
|
||||
|
||||
res = qs = qs.aggregate(
|
||||
create = Max('create'),
|
||||
read = Max('read'),
|
||||
write = Max('write'),
|
||||
update = Max('update'),
|
||||
delete = Max('delete'),
|
||||
scm_update = Max('scm_update'),
|
||||
execute = Max('execute'),
|
||||
use = Max('use')
|
||||
)
|
||||
if res['read'] is None:
|
||||
return None
|
||||
return res
|
||||
|
||||
def get_role_permissions_on_resource(resource, role):
|
||||
'''
|
||||
Returns a dict (or None) of the permissions a role has for a given
|
||||
resource.
|
||||
|
||||
Note: Each field in the dict is the `or` of all respective permissions
|
||||
that have been granted to either the role or any descendents of that role.
|
||||
'''
|
||||
|
||||
qs = RolePermission.objects.filter(
|
||||
content_type=ContentType.objects.get_for_model(resource),
|
||||
object_id=resource.id,
|
||||
role__ancestors=role
|
||||
)
|
||||
|
||||
res = qs = qs.aggregate(
|
||||
create = Max('create'),
|
||||
read = Max('read'),
|
||||
write = Max('write'),
|
||||
update = Max('update'),
|
||||
delete = Max('delete'),
|
||||
scm_update = Max('scm_update'),
|
||||
execute = Max('execute'),
|
||||
use = Max('use')
|
||||
)
|
||||
if res['read'] is None:
|
||||
return None
|
||||
return res
|
||||
|
@ -1,6 +1,7 @@
|
||||
import mock # noqa
|
||||
import pytest
|
||||
|
||||
from django.contrib.contenttypes.models import ContentType
|
||||
from django.core.urlresolvers import reverse
|
||||
from awx.main.models.rbac import Role, ROLE_SINGLETON_SYSTEM_ADMINISTRATOR
|
||||
|
||||
@ -53,8 +54,6 @@ def test_get_roles_list_user(organization, inventory, team, get, user):
|
||||
assert team.member_role.id not in role_hash
|
||||
|
||||
|
||||
|
||||
|
||||
@pytest.mark.django_db
|
||||
def test_cant_create_role(post, admin):
|
||||
"Ensure we can't create new roles through the api"
|
||||
@ -225,7 +224,7 @@ def test_get_role(get, admin, role):
|
||||
assert response.data['id'] == role.id
|
||||
|
||||
@pytest.mark.django_db
|
||||
def test_put_role(put, admin, role):
|
||||
def test_put_role_405(put, admin, role):
|
||||
url = reverse('api:role_detail', args=(role.id,))
|
||||
response = put(url, {'name': 'Some new name'}, admin)
|
||||
assert response.status_code == 405
|
||||
@ -233,7 +232,7 @@ def test_put_role(put, admin, role):
|
||||
#assert r.name == 'Some new name'
|
||||
|
||||
@pytest.mark.django_db
|
||||
def test_put_role_access_denied(put, alice, admin, role):
|
||||
def test_put_role_access_denied(put, alice, role):
|
||||
url = reverse('api:role_detail', args=(role.id,))
|
||||
response = put(url, {'name': 'Some new name'}, alice)
|
||||
assert response.status_code == 403 or response.status_code == 405
|
||||
@ -400,8 +399,10 @@ def test_role_children(get, team, admin, role):
|
||||
@pytest.mark.django_db
|
||||
def test_resource_access_list(get, team, admin, role):
|
||||
team.users.add(admin)
|
||||
url = reverse('api:resource_access_list', args=(team.resource.id,))
|
||||
content_type_id = ContentType.objects.get_for_model(team).pk
|
||||
url = reverse('api:resource_access_list', args=(content_type_id, team.id,))
|
||||
res = get(url, admin)
|
||||
print(res.data)
|
||||
assert res.status_code == 200
|
||||
|
||||
|
||||
@ -420,7 +421,6 @@ def test_ensure_rbac_fields_are_present(organization, get, admin):
|
||||
assert 'summary_fields' in org
|
||||
assert 'resource_id' in org
|
||||
assert org['resource_id'] > 0
|
||||
assert org['related']['resource'] != ''
|
||||
assert 'roles' in org['summary_fields']
|
||||
|
||||
org_role_response = get(org['summary_fields']['roles']['admin_role']['url'], admin)
|
||||
@ -434,7 +434,6 @@ def test_ensure_rbac_fields_are_present(organization, get, admin):
|
||||
|
||||
@pytest.mark.django_db
|
||||
def test_ensure_permissions_is_present(organization, get, user):
|
||||
#u = user('admin', True)
|
||||
url = reverse('api:organization_detail', args=(organization.id,))
|
||||
response = get(url, user('admin', True))
|
||||
assert response.status_code == 200
|
||||
@ -446,7 +445,6 @@ def test_ensure_permissions_is_present(organization, get, user):
|
||||
|
||||
@pytest.mark.django_db
|
||||
def test_ensure_role_summary_is_present(organization, get, user):
|
||||
#u = user('admin', True)
|
||||
url = reverse('api:organization_detail', args=(organization.id,))
|
||||
response = get(url, user('admin', True))
|
||||
assert response.status_code == 200
|
||||
|
@ -2,7 +2,7 @@ import pytest
|
||||
|
||||
from awx.main.models import (
|
||||
Role,
|
||||
Resource,
|
||||
RolePermission,
|
||||
Organization,
|
||||
)
|
||||
|
||||
@ -13,17 +13,27 @@ def test_auto_inheritance_by_children(organization, alice):
|
||||
B = Role.objects.create(name='B')
|
||||
A.members.add(alice)
|
||||
|
||||
|
||||
|
||||
assert organization.accessible_by(alice, {'read': True}) is False
|
||||
assert Organization.accessible_objects(alice, {'read': True}).count() == 0
|
||||
A.children.add(B)
|
||||
assert organization.accessible_by(alice, {'read': True}) is False
|
||||
assert Organization.accessible_objects(alice, {'read': True}).count() == 0
|
||||
A.children.add(organization.admin_role)
|
||||
assert organization.accessible_by(alice, {'read': True}) is True
|
||||
assert Organization.accessible_objects(alice, {'read': True}).count() == 1
|
||||
A.children.remove(organization.admin_role)
|
||||
assert organization.accessible_by(alice, {'read': True}) is False
|
||||
B.children.add(organization.admin_role)
|
||||
assert organization.accessible_by(alice, {'read': True}) is True
|
||||
B.children.remove(organization.admin_role)
|
||||
assert organization.accessible_by(alice, {'read': True}) is False
|
||||
assert Organization.accessible_objects(alice, {'read': True}).count() == 0
|
||||
|
||||
# We've had the case where our pre/post save init handlers in our field descriptors
|
||||
# end up creating a ton of role objects because of various not-so-obvious issues
|
||||
assert Role.objects.count() < 50
|
||||
|
||||
|
||||
@pytest.mark.django_db
|
||||
@ -53,12 +63,29 @@ def test_permission_union(organization, alice):
|
||||
B.members.add(alice)
|
||||
|
||||
assert organization.accessible_by(alice, {'read': True, 'write': True}) is False
|
||||
A.grant(organization, {'read': True})
|
||||
RolePermission.objects.create(role=A, resource=organization, read=True)
|
||||
assert organization.accessible_by(alice, {'read': True, 'write': True}) is False
|
||||
B.grant(organization, {'write': True})
|
||||
RolePermission.objects.create(role=A, resource=organization, write=True)
|
||||
assert organization.accessible_by(alice, {'read': True, 'write': True}) is True
|
||||
|
||||
|
||||
@pytest.mark.django_db
|
||||
def test_accessible_objects(organization, alice, bob):
|
||||
A = Role.objects.create(name='A')
|
||||
A.members.add(alice)
|
||||
B = Role.objects.create(name='B')
|
||||
B.members.add(alice)
|
||||
B.members.add(bob)
|
||||
|
||||
assert Organization.accessible_objects(alice, {'read': True, 'write': True}).count() == 0
|
||||
RolePermission.objects.create(role=A, resource=organization, read=True)
|
||||
assert Organization.accessible_objects(alice, {'read': True, 'write': True}).count() == 0
|
||||
assert Organization.accessible_objects(bob, {'read': True, 'write': True}).count() == 0
|
||||
RolePermission.objects.create(role=B, resource=organization, write=True)
|
||||
assert Organization.accessible_objects(alice, {'read': True, 'write': True}).count() == 1
|
||||
assert Organization.accessible_objects(bob, {'read': True, 'write': True}).count() == 0
|
||||
assert Organization.accessible_objects(bob, {'read': True, 'write': True}).count() == 0
|
||||
|
||||
@pytest.mark.django_db
|
||||
def test_team_symantics(organization, team, alice):
|
||||
assert organization.accessible_by(alice, {'read': True}) is False
|
||||
@ -110,32 +137,28 @@ def test_implicit_deletes(alice):
|
||||
delorg = Organization.objects.create(name='test-org')
|
||||
delorg.admin_role.members.add(alice)
|
||||
|
||||
resource_id = delorg.resource.id
|
||||
admin_role_id = delorg.admin_role.id
|
||||
auditor_role_id = delorg.auditor_role.id
|
||||
|
||||
assert Role.objects.filter(id=admin_role_id).count() == 1
|
||||
assert Role.objects.filter(id=auditor_role_id).count() == 1
|
||||
assert Resource.objects.filter(id=resource_id).count() == 1
|
||||
n_alice_roles = alice.roles.count()
|
||||
n_system_admin_children = Role.singleton('System Administrator').children.count()
|
||||
rp = RolePermission.objects.create(role=delorg.admin_role, resource=delorg, read=True)
|
||||
|
||||
delorg.delete()
|
||||
|
||||
assert Role.objects.filter(id=admin_role_id).count() == 0
|
||||
assert Role.objects.filter(id=auditor_role_id).count() == 0
|
||||
assert Resource.objects.filter(id=resource_id).count() == 0
|
||||
assert alice.roles.count() == (n_alice_roles - 1)
|
||||
assert RolePermission.objects.filter(id=rp.id).count() == 0
|
||||
assert Role.singleton('System Administrator').children.count() == (n_system_admin_children - 1)
|
||||
|
||||
@pytest.mark.django_db
|
||||
def test_content_object(user):
|
||||
'Ensure our conent_object stuf seems to be working'
|
||||
'Ensure our content_object stuf seems to be working'
|
||||
|
||||
print('Creating organization')
|
||||
org = Organization.objects.create(name='test-org')
|
||||
print('Organizaiton id: %d resource: %d admin_role: %d' % (org.id, org.resource.id, org.admin_role.id))
|
||||
assert org.resource.content_object.id == org.id
|
||||
assert org.admin_role.content_object.id == org.id
|
||||
|
||||
@pytest.mark.django_db
|
||||
|
Loading…
Reference in New Issue
Block a user