mirror of
https://github.com/ansible/awx.git
synced 2024-10-31 06:51:10 +03:00
Pass request data to various functions to allow for extra validation to be implemented on top of what is there now.
This commit is contained in:
parent
4cd7405a37
commit
0ed275c3c8
@ -139,11 +139,11 @@ class BaseSubList(BaseList):
|
||||
raise PermissionDenied()
|
||||
|
||||
if self.__class__.parent_model != User:
|
||||
if not self.__class__.parent_model.can_user_attach(request.user, main, obj, self.__class__.relationship):
|
||||
if not self.__class__.parent_model.can_user_attach(request.user, main, obj, self.__class__.relationship, request.DATA):
|
||||
raise PermissionDenied()
|
||||
else:
|
||||
# FIXME: should generalize this
|
||||
if not UserHelper.can_user_attach(request.user, main, obj, self.__class__.relationship):
|
||||
if not UserHelper.can_user_attach(request.user, main, obj, self.__class__.relationship, request.DATA):
|
||||
raise PermissionDenied()
|
||||
|
||||
return Response(status=status.HTTP_201_CREATED, data=ser.data)
|
||||
@ -164,10 +164,10 @@ class BaseSubList(BaseList):
|
||||
if not 'disassociate' in request.DATA:
|
||||
if not request.user.is_superuser:
|
||||
if type(main) != User:
|
||||
if not self.__class__.parent_model.can_user_attach(request.user, main, sub, self.__class__.relationship):
|
||||
if not self.__class__.parent_model.can_user_attach(request.user, main, sub, self.__class__.relationship, request.DATA):
|
||||
raise PermissionDenied()
|
||||
else:
|
||||
if not UserHelper.can_user_attach(request.user, main, sub, self.__class__.relationship):
|
||||
if not UserHelper.can_user_attach(request.user, main, sub, self.__class__.relationship, request.DATA):
|
||||
raise PermissionDenied()
|
||||
|
||||
if sub in relationship.all():
|
||||
@ -237,11 +237,9 @@ class BaseDetail(generics.RetrieveUpdateDestroyAPIView):
|
||||
return self.__class__.model.can_user_read(request.user, obj)
|
||||
elif request.method in [ 'PUT' ]:
|
||||
if type(obj) == User:
|
||||
# FIXME: pass request.DATA to all of these and verify permissions on subobjects
|
||||
return UserHelper.can_user_administrate(request.user, obj)
|
||||
return UserHelper.can_user_administrate(request.user, obj, request.DATA)
|
||||
else:
|
||||
# FIXME: pass request.DATA to all of these and verify permission on subobjects
|
||||
return self.__class__.model.can_user_administrate(request.user, obj)
|
||||
return self.__class__.model.can_user_administrate(request.user, obj, request.DATA)
|
||||
return False
|
||||
|
||||
def put(self, request, *args, **kwargs):
|
||||
@ -269,7 +267,7 @@ class VariableBaseDetail(BaseDetail):
|
||||
if request.method == 'GET':
|
||||
return self.__class__.parent_model.can_user_read(request.user, through_obj)
|
||||
elif request.method in [ 'PUT' ]:
|
||||
return self.__class__.parent_model.can_user_administrate(request.user, through_obj)
|
||||
return self.__class__.parent_model.can_user_administrate(request.user, through_obj, request.DATA)
|
||||
return False
|
||||
|
||||
def put(self, request, *args, **kwargs):
|
||||
|
@ -78,7 +78,7 @@ class EditHelper(object):
|
||||
@classmethod
|
||||
def illegal_changes(cls, request, obj, model_class):
|
||||
''' have any illegal changes been made (for a PUT request)? '''
|
||||
can_admin = model_class.can_user_administrate(request.user, obj)
|
||||
can_admin = model_class.can_user_administrate(request.user, obj, request.DATA)
|
||||
if (not can_admin) or (can_admin == 'partial'):
|
||||
check_fields = model_class.admin_only_edit_fields
|
||||
changed = cls.fields_changed(check_fields, obj, request.DATA)
|
||||
@ -105,7 +105,7 @@ class UserHelper(object):
|
||||
admin_only_edit_fields = ('last_name', 'first_name', 'username', 'is_active', 'is_superuser')
|
||||
|
||||
@classmethod
|
||||
def can_user_administrate(cls, user, obj):
|
||||
def can_user_administrate(cls, user, obj, data):
|
||||
''' a user can be administrated if they are themselves, or by org admins or superusers '''
|
||||
if user == obj:
|
||||
return 'partial'
|
||||
@ -118,7 +118,7 @@ class UserHelper(object):
|
||||
def can_user_read(cls, user, obj):
|
||||
''' a user can be read if they are on the same team or can be administrated '''
|
||||
matching_teams = user.teams.filter(users__in = [ user ]).count()
|
||||
return matching_teams or cls.can_user_administrate(user, obj)
|
||||
return matching_teams or cls.can_user_administrate(user, obj, None)
|
||||
|
||||
@classmethod
|
||||
def can_user_delete(cls, user, obj):
|
||||
@ -128,16 +128,18 @@ class UserHelper(object):
|
||||
return matching_orgs
|
||||
|
||||
@classmethod
|
||||
def can_user_attach(cls, user, obj, sub_obj, relationship_type):
|
||||
def can_user_attach(cls, user, obj, sub_obj, relationship_type, data):
|
||||
if type(sub_obj) != User:
|
||||
if not sub_obj.can_user_read(user, sub_obj):
|
||||
return False
|
||||
rc = cls.can_user_administrate(user, obj)
|
||||
return rc
|
||||
rc = cls.can_user_administrate(user, obj, None)
|
||||
if not rc:
|
||||
return False
|
||||
return sub_obj.__class__.can_user_read(user, sub_obj)
|
||||
|
||||
@classmethod
|
||||
def can_user_unattach(cls, user, obj, sub_obj, relationship_type):
|
||||
return cls.can_user_administrate(user, obj)
|
||||
return cls.can_user_administrate(user, obj, None)
|
||||
|
||||
class PrimordialModel(models.Model):
|
||||
'''
|
||||
@ -160,7 +162,7 @@ class PrimordialModel(models.Model):
|
||||
return unicode("%s-%s"% (self.name, self.id))
|
||||
|
||||
@classmethod
|
||||
def can_user_administrate(cls, user, obj):
|
||||
def can_user_administrate(cls, user, obj, data):
|
||||
# FIXME: do we want a seperate method to override put? This is kind of general purpose
|
||||
raise exceptions.NotImplementedError()
|
||||
|
||||
@ -177,17 +179,24 @@ class PrimordialModel(models.Model):
|
||||
return user.is_superuser
|
||||
|
||||
@classmethod
|
||||
def can_user_attach(cls, user, obj, sub_obj, relationship_type):
|
||||
def can_user_attach(cls, user, obj, sub_obj, relationship_type, data):
|
||||
''' whether you can add sub_obj to obj using the relationship type in a subobject view '''
|
||||
if type(sub_obj) != User:
|
||||
if not sub_obj.can_user_read(user, sub_obj):
|
||||
return False
|
||||
rc = cls.can_user_administrate(user, obj)
|
||||
return rc
|
||||
rc = cls.can_user_administrate(user, obj, None)
|
||||
if not rc:
|
||||
return False
|
||||
|
||||
# in order to attach something you also be able to read what you are attaching
|
||||
if type(sub_obj) == User:
|
||||
return UserHelper.can_user_read(user, sub_obj)
|
||||
else:
|
||||
return sub_obj.__class__.can_user_read(user, sub_obj)
|
||||
|
||||
@classmethod
|
||||
def can_user_unattach(cls, user, obj, sub_obj, relationship):
|
||||
return cls.can_user_administrate(user, obj)
|
||||
return cls.can_user_administrate(user, obj, None)
|
||||
|
||||
class CommonModel(PrimordialModel):
|
||||
''' a base model where the name is unique '''
|
||||
@ -271,7 +280,7 @@ class Organization(CommonModel):
|
||||
return user in obj.admins.all()
|
||||
|
||||
@classmethod
|
||||
def can_user_administrate(cls, user, obj):
|
||||
def can_user_administrate(cls, user, obj, data):
|
||||
# FIXME: super user checks should be higher up so we don't have to repeat them
|
||||
if user.is_superuser:
|
||||
return True
|
||||
@ -282,11 +291,11 @@ class Organization(CommonModel):
|
||||
|
||||
@classmethod
|
||||
def can_user_read(cls, user, obj):
|
||||
return cls.can_user_administrate(user,obj) or user in obj.users.all()
|
||||
return cls.can_user_administrate(user,obj,None) or user in obj.users.all()
|
||||
|
||||
@classmethod
|
||||
def can_user_delete(cls, user, obj):
|
||||
return cls.can_user_administrate(user, obj)
|
||||
return cls.can_user_administrate(user, obj, None)
|
||||
|
||||
def __unicode__(self):
|
||||
return self.name
|
||||
@ -361,11 +370,11 @@ class Inventory(CommonModel):
|
||||
return False
|
||||
|
||||
@classmethod
|
||||
def can_user_administrate(cls, user, obj):
|
||||
def can_user_administrate(cls, user, obj, data):
|
||||
return cls._has_permission_types(user, obj, PERMISSION_TYPES_ALLOWING_INVENTORY_ADMIN)
|
||||
|
||||
@classmethod
|
||||
def can_user_attach(cls, user, obj, sub_obj, relationship_type):
|
||||
def can_user_attach(cls, user, obj, sub_obj, relationship_type, data):
|
||||
''' whether you can add sub_obj to obj using the relationship type in a subobject view '''
|
||||
if not sub_obj.can_user_read(user, sub_obj):
|
||||
return False
|
||||
@ -445,7 +454,7 @@ class Group(CommonModelNameNotUnique):
|
||||
|
||||
|
||||
@classmethod
|
||||
def can_user_administrate(cls, user, obj):
|
||||
def can_user_administrate(cls, user, obj, data):
|
||||
# here this controls whether the user can attach subgroups
|
||||
return Inventory._has_permission_types(user, obj.inventory, PERMISSION_TYPES_ALLOWING_INVENTORY_WRITE)
|
||||
|
||||
@ -536,7 +545,7 @@ class Credential(CommonModelNameNotUnique):
|
||||
sudo_password = models.CharField(blank=True, default='', max_length=1024)
|
||||
|
||||
@classmethod
|
||||
def can_user_administrate(cls, user, obj):
|
||||
def can_user_administrate(cls, user, obj, data):
|
||||
if user.is_superuser:
|
||||
return True
|
||||
if user == obj.user:
|
||||
@ -555,12 +564,12 @@ class Credential(CommonModelNameNotUnique):
|
||||
if obj.user is None and obj.team is None:
|
||||
# unassociated credentials may be marked deleted by anyone
|
||||
return True
|
||||
return cls.can_user_administrate(user,obj)
|
||||
return cls.can_user_administrate(user,obj,None)
|
||||
|
||||
@classmethod
|
||||
def can_user_read(cls, user, obj):
|
||||
''' a user can be read if they are on the same team or can be administrated '''
|
||||
return cls.can_user_administrate(user, obj)
|
||||
return cls.can_user_administrate(user, obj, None)
|
||||
|
||||
@classmethod
|
||||
def can_user_add(cls, user, data):
|
||||
@ -568,10 +577,10 @@ class Credential(CommonModelNameNotUnique):
|
||||
return True
|
||||
if 'user' in data:
|
||||
user_obj = User.objects.get(pk=data['user'])
|
||||
return UserHelper.can_user_administrate(user, user_obj)
|
||||
return UserHelper.can_user_administrate(user, user_obj, data)
|
||||
if 'team' in data:
|
||||
team_obj = Team.objects.get(pk=data['team'])
|
||||
return Team.can_user_administrate(user, team_obj)
|
||||
return Team.can_user_administrate(user, team_obj, data)
|
||||
|
||||
def get_absolute_url(self):
|
||||
import lib.urls
|
||||
@ -594,7 +603,7 @@ class Team(CommonModel):
|
||||
return reverse(lib.urls.views_TeamsDetail, args=(self.pk,))
|
||||
|
||||
@classmethod
|
||||
def can_user_administrate(cls, user, obj):
|
||||
def can_user_administrate(cls, user, obj, data):
|
||||
# FIXME -- audit when this is called explicitly, if any
|
||||
if user.is_superuser:
|
||||
return True
|
||||
@ -604,7 +613,7 @@ class Team(CommonModel):
|
||||
|
||||
@classmethod
|
||||
def can_user_read(cls, user, obj):
|
||||
if cls.can_user_administrate(user, obj):
|
||||
if cls.can_user_administrate(user, obj, None):
|
||||
return True
|
||||
if obj.users.filter(pk__in = [ user.pk ]).count():
|
||||
return True
|
||||
@ -622,7 +631,7 @@ class Team(CommonModel):
|
||||
|
||||
@classmethod
|
||||
def can_user_delete(cls, user, obj):
|
||||
return cls.can_user_administrate(user, obj)
|
||||
return cls.can_user_administrate(user, obj, None)
|
||||
|
||||
class Project(CommonModel):
|
||||
'''
|
||||
@ -641,7 +650,7 @@ class Project(CommonModel):
|
||||
return reverse(lib.urls.views_ProjectsDetail, args=(self.pk,))
|
||||
|
||||
@classmethod
|
||||
def can_user_administrate(cls, user, obj):
|
||||
def can_user_administrate(cls, user, obj, data):
|
||||
if user.is_superuser:
|
||||
return True
|
||||
if obj.created_by == user:
|
||||
@ -654,7 +663,7 @@ class Project(CommonModel):
|
||||
|
||||
@classmethod
|
||||
def can_user_read(cls, user, obj):
|
||||
if cls.can_user_administrate(user,obj):
|
||||
if cls.can_user_administrate(user, obj, None):
|
||||
return True
|
||||
# and also if I happen to be on a team inside the project
|
||||
# FIXME: add this too
|
||||
@ -662,7 +671,7 @@ class Project(CommonModel):
|
||||
|
||||
@classmethod
|
||||
def can_user_delete(cls, user, obj):
|
||||
return cls.can_user_administrate(user, obj)
|
||||
return cls.can_user_administrate(user, obj, None)
|
||||
|
||||
|
||||
class Permission(CommonModelNameNotUnique):
|
||||
|
@ -154,7 +154,6 @@ class CredentialSerializer(BaseSerializer):
|
||||
|
||||
def validate(self, attrs):
|
||||
''' some fields cannot be changed once written '''
|
||||
import epdb; epdb.st()
|
||||
if self.object is not None:
|
||||
# this is an update
|
||||
if self.object.user != attrs['user']:
|
||||
|
@ -80,7 +80,7 @@ class OrganizationsAuditTrailList(BaseSubList):
|
||||
''' to list tags in the organization, I must be a superuser or org admin '''
|
||||
organization = Organization.objects.get(pk=self.kwargs['pk'])
|
||||
if not (self.request.user.is_superuser or self.request.user in organization.admins.all()):
|
||||
# FIXME: use: organization.can_user_administrate(self.request.user)
|
||||
# FIXME: use: organization.can_user_administrate(...) ?
|
||||
raise PermissionDenied()
|
||||
return AuditTrail.objects.filter(organization_by_audit_trail__in = [ organization ])
|
||||
|
||||
@ -154,7 +154,7 @@ class OrganizationsTagsList(BaseSubList):
|
||||
''' to list tags in the organization, I must be a superuser or org admin '''
|
||||
organization = Organization.objects.get(pk=self.kwargs['pk'])
|
||||
if not (self.request.user.is_superuser or self.request.user in organization.admins.all()):
|
||||
# FIXME: use: organization.can_user_administrate(self.request.user)
|
||||
# FIXME: use: organization.can_user_administrate(...) ?
|
||||
raise PermissionDenied()
|
||||
return Tag.objects.filter(organization_by_tag__in = [ organization ])
|
||||
|
||||
@ -242,7 +242,7 @@ class TeamsCredentialsList(BaseSubList):
|
||||
|
||||
def _get_queryset(self):
|
||||
team = Team.objects.get(pk=self.kwargs['pk'])
|
||||
if not Team.can_user_administrate(self.request.user, team):
|
||||
if not Team.can_user_administrate(self.request.user, team, None):
|
||||
if not (self.request.user.is_superuser or self.request.user in team.users.all()):
|
||||
raise PermissionDenied()
|
||||
project_credentials = Credential.objects.filter(
|
||||
@ -357,7 +357,7 @@ class UsersTeamsList(BaseSubList):
|
||||
|
||||
def _get_queryset(self):
|
||||
user = User.objects.get(pk=self.kwargs['pk'])
|
||||
if not UserHelper.can_user_administrate(self.request.user, user):
|
||||
if not UserHelper.can_user_administrate(self.request.user, user, None):
|
||||
raise PermissionDenied()
|
||||
return Team.objects.filter(users__in = [ user ])
|
||||
|
||||
@ -373,7 +373,7 @@ class UsersProjectsList(BaseSubList):
|
||||
|
||||
def _get_queryset(self):
|
||||
user = User.objects.get(pk=self.kwargs['pk'])
|
||||
if not UserHelper.can_user_administrate(self.request.user, user):
|
||||
if not UserHelper.can_user_administrate(self.request.user, user, None):
|
||||
raise PermissionDenied()
|
||||
teams = user.teams.all()
|
||||
return Project.objects.filter(teams__in = teams)
|
||||
@ -391,7 +391,7 @@ class UsersCredentialsList(BaseSubList):
|
||||
|
||||
def _get_queryset(self):
|
||||
user = User.objects.get(pk=self.kwargs['pk'])
|
||||
if not UserHelper.can_user_administrate(self.request.user, user):
|
||||
if not UserHelper.can_user_administrate(self.request.user, user, None):
|
||||
raise PermissionDenied()
|
||||
project_credentials = Credential.objects.filter(
|
||||
team__users__in = [ user ]
|
||||
@ -410,7 +410,7 @@ class UsersOrganizationsList(BaseSubList):
|
||||
|
||||
def _get_queryset(self):
|
||||
user = User.objects.get(pk=self.kwargs['pk'])
|
||||
if not UserHelper.can_user_administrate(self.request.user, user):
|
||||
if not UserHelper.can_user_administrate(self.request.user, user, None):
|
||||
raise PermissionDenied()
|
||||
return Organization.objects.filter(users__in = [ user ])
|
||||
|
||||
@ -426,7 +426,7 @@ class UsersAdminOrganizationsList(BaseSubList):
|
||||
|
||||
def _get_queryset(self):
|
||||
user = User.objects.get(pk=self.kwargs['pk'])
|
||||
if not UserHelper.can_user_administrate(self.request.user, user):
|
||||
if not UserHelper.can_user_administrate(self.request.user, user, None):
|
||||
raise PermissionDenied()
|
||||
return Organization.objects.filter(admins__in = [ user ])
|
||||
|
||||
|
Loading…
Reference in New Issue
Block a user