mirror of
https://github.com/dkmstr/openuds.git
synced 2025-03-13 08:58:35 +03:00
More on permissions refactor
This commit is contained in:
parent
c0c3e7af13
commit
57d2c40947
@ -83,95 +83,95 @@ class PermissionsTest(UDSTestCase):
|
||||
|
||||
def doTestUserPermissions(self, obj, user: models.User):
|
||||
permissions.addUserPermission(
|
||||
user, obj, permissions.PermissionType.PERMISSION_NONE
|
||||
user, obj, permissions.PermissionType.NONE
|
||||
)
|
||||
self.assertEqual(models.Permissions.objects.count(), 1)
|
||||
perm = models.Permissions.objects.all()[0]
|
||||
self.assertEqual(perm.object_type, objtype.ObjectType.from_model(obj).type)
|
||||
self.assertEqual(perm.object_id, obj.pk)
|
||||
self.assertEqual(perm.permission, permissions.PermissionType.PERMISSION_NONE)
|
||||
self.assertEqual(perm.permission, permissions.PermissionType.NONE)
|
||||
self.assertTrue(
|
||||
permissions.checkPermissions(
|
||||
user, obj, permissions.PermissionType.PERMISSION_NONE
|
||||
permissions.hasAccess(
|
||||
user, obj, permissions.PermissionType.NONE
|
||||
)
|
||||
)
|
||||
self.assertEqual(
|
||||
permissions.checkPermissions(
|
||||
user, obj, permissions.PermissionType.PERMISSION_READ
|
||||
permissions.hasAccess(
|
||||
user, obj, permissions.PermissionType.READ
|
||||
),
|
||||
user.is_admin,
|
||||
)
|
||||
self.assertEqual(
|
||||
permissions.checkPermissions(
|
||||
user, obj, permissions.PermissionType.PERMISSION_MANAGEMENT
|
||||
permissions.hasAccess(
|
||||
user, obj, permissions.PermissionType.MANAGEMENT
|
||||
),
|
||||
user.is_admin,
|
||||
)
|
||||
self.assertEqual(
|
||||
permissions.checkPermissions(
|
||||
user, obj, permissions.PermissionType.PERMISSION_ALL
|
||||
permissions.hasAccess(
|
||||
user, obj, permissions.PermissionType.ALL
|
||||
),
|
||||
user.is_admin,
|
||||
)
|
||||
|
||||
# Add a new permission, must overwrite the previous one
|
||||
permissions.addUserPermission(
|
||||
user, obj, permissions.PermissionType.PERMISSION_ALL
|
||||
user, obj, permissions.PermissionType.ALL
|
||||
)
|
||||
self.assertEqual(models.Permissions.objects.count(), 1)
|
||||
perm = models.Permissions.objects.all()[0]
|
||||
self.assertEqual(perm.object_type, PermissionsTest.getObjectType(obj))
|
||||
self.assertEqual(perm.object_id, obj.pk)
|
||||
self.assertEqual(perm.permission, permissions.PermissionType.PERMISSION_ALL)
|
||||
self.assertEqual(perm.permission, permissions.PermissionType.ALL)
|
||||
self.assertTrue(
|
||||
permissions.checkPermissions(
|
||||
user, obj, permissions.PermissionType.PERMISSION_NONE
|
||||
permissions.hasAccess(
|
||||
user, obj, permissions.PermissionType.NONE
|
||||
)
|
||||
)
|
||||
self.assertTrue(
|
||||
permissions.checkPermissions(
|
||||
user, obj, permissions.PermissionType.PERMISSION_READ
|
||||
permissions.hasAccess(
|
||||
user, obj, permissions.PermissionType.READ
|
||||
)
|
||||
)
|
||||
self.assertTrue(
|
||||
permissions.checkPermissions(
|
||||
user, obj, permissions.PermissionType.PERMISSION_MANAGEMENT
|
||||
permissions.hasAccess(
|
||||
user, obj, permissions.PermissionType.MANAGEMENT
|
||||
)
|
||||
)
|
||||
self.assertTrue(
|
||||
permissions.checkPermissions(
|
||||
user, obj, permissions.PermissionType.PERMISSION_ALL
|
||||
permissions.hasAccess(
|
||||
user, obj, permissions.PermissionType.ALL
|
||||
)
|
||||
)
|
||||
|
||||
# Again, with read
|
||||
permissions.addUserPermission(
|
||||
user, obj, permissions.PermissionType.PERMISSION_READ
|
||||
user, obj, permissions.PermissionType.READ
|
||||
)
|
||||
self.assertEqual(models.Permissions.objects.count(), 1)
|
||||
perm = models.Permissions.objects.all()[0]
|
||||
self.assertEqual(perm.object_type, PermissionsTest.getObjectType(obj))
|
||||
self.assertEqual(perm.object_id, obj.pk)
|
||||
self.assertEqual(perm.permission, permissions.PermissionType.PERMISSION_READ)
|
||||
self.assertEqual(perm.permission, permissions.PermissionType.READ)
|
||||
self.assertTrue(
|
||||
permissions.checkPermissions(
|
||||
user, obj, permissions.PermissionType.PERMISSION_NONE
|
||||
permissions.hasAccess(
|
||||
user, obj, permissions.PermissionType.NONE
|
||||
)
|
||||
)
|
||||
self.assertTrue(
|
||||
permissions.checkPermissions(
|
||||
user, obj, permissions.PermissionType.PERMISSION_READ
|
||||
permissions.hasAccess(
|
||||
user, obj, permissions.PermissionType.READ
|
||||
)
|
||||
)
|
||||
self.assertEqual(
|
||||
permissions.checkPermissions(
|
||||
user, obj, permissions.PermissionType.PERMISSION_MANAGEMENT
|
||||
permissions.hasAccess(
|
||||
user, obj, permissions.PermissionType.MANAGEMENT
|
||||
),
|
||||
user.is_admin,
|
||||
)
|
||||
self.assertEqual(
|
||||
permissions.checkPermissions(
|
||||
user, obj, permissions.PermissionType.PERMISSION_ALL
|
||||
permissions.hasAccess(
|
||||
user, obj, permissions.PermissionType.ALL
|
||||
),
|
||||
user.is_admin,
|
||||
)
|
||||
@ -184,79 +184,79 @@ class PermissionsTest(UDSTestCase):
|
||||
group = user.groups.all()[0]
|
||||
|
||||
permissions.addGroupPermission(
|
||||
group, obj, permissions.PermissionType.PERMISSION_NONE
|
||||
group, obj, permissions.PermissionType.NONE
|
||||
)
|
||||
self.assertEqual(models.Permissions.objects.count(), 1)
|
||||
perm = models.Permissions.objects.all()[0]
|
||||
self.assertEqual(perm.object_type, PermissionsTest.getObjectType(obj))
|
||||
self.assertEqual(perm.object_id, obj.pk)
|
||||
self.assertEqual(perm.permission, permissions.PermissionType.PERMISSION_NONE)
|
||||
self.assertEqual(perm.permission, permissions.PermissionType.NONE)
|
||||
self.assertTrue(
|
||||
permissions.checkPermissions(
|
||||
user, obj, permissions.PermissionType.PERMISSION_NONE
|
||||
permissions.hasAccess(
|
||||
user, obj, permissions.PermissionType.NONE
|
||||
)
|
||||
)
|
||||
# Admins has all permissions ALWAYS
|
||||
self.assertEqual(
|
||||
permissions.checkPermissions(
|
||||
user, obj, permissions.PermissionType.PERMISSION_READ
|
||||
permissions.hasAccess(
|
||||
user, obj, permissions.PermissionType.READ
|
||||
),
|
||||
user.is_admin,
|
||||
)
|
||||
self.assertEqual(
|
||||
permissions.checkPermissions(
|
||||
user, obj, permissions.PermissionType.PERMISSION_ALL
|
||||
permissions.hasAccess(
|
||||
user, obj, permissions.PermissionType.ALL
|
||||
),
|
||||
user.is_admin,
|
||||
)
|
||||
|
||||
permissions.addGroupPermission(
|
||||
group, obj, permissions.PermissionType.PERMISSION_ALL
|
||||
group, obj, permissions.PermissionType.ALL
|
||||
)
|
||||
self.assertEqual(models.Permissions.objects.count(), 1)
|
||||
perm = models.Permissions.objects.all()[0]
|
||||
self.assertEqual(perm.object_type, PermissionsTest.getObjectType(obj))
|
||||
self.assertEqual(perm.object_id, obj.pk)
|
||||
self.assertEqual(perm.permission, permissions.PermissionType.PERMISSION_ALL)
|
||||
self.assertEqual(perm.permission, permissions.PermissionType.ALL)
|
||||
self.assertTrue(
|
||||
permissions.checkPermissions(
|
||||
user, obj, permissions.PermissionType.PERMISSION_NONE
|
||||
permissions.hasAccess(
|
||||
user, obj, permissions.PermissionType.NONE
|
||||
)
|
||||
)
|
||||
self.assertTrue(
|
||||
permissions.checkPermissions(
|
||||
user, obj, permissions.PermissionType.PERMISSION_READ
|
||||
permissions.hasAccess(
|
||||
user, obj, permissions.PermissionType.READ
|
||||
)
|
||||
)
|
||||
self.assertTrue(
|
||||
permissions.checkPermissions(
|
||||
user, obj, permissions.PermissionType.PERMISSION_ALL
|
||||
permissions.hasAccess(
|
||||
user, obj, permissions.PermissionType.ALL
|
||||
)
|
||||
)
|
||||
|
||||
# Add user permission, DB must contain both an return ALL
|
||||
|
||||
permissions.addUserPermission(
|
||||
user, obj, permissions.PermissionType.PERMISSION_READ
|
||||
user, obj, permissions.PermissionType.READ
|
||||
)
|
||||
self.assertEqual(models.Permissions.objects.count(), 2)
|
||||
perm = models.Permissions.objects.all()[0]
|
||||
self.assertEqual(perm.object_type, PermissionsTest.getObjectType(obj))
|
||||
self.assertEqual(perm.object_id, obj.pk)
|
||||
self.assertEqual(perm.permission, permissions.PermissionType.PERMISSION_ALL)
|
||||
self.assertEqual(perm.permission, permissions.PermissionType.ALL)
|
||||
self.assertTrue(
|
||||
permissions.checkPermissions(
|
||||
user, obj, permissions.PermissionType.PERMISSION_NONE
|
||||
permissions.hasAccess(
|
||||
user, obj, permissions.PermissionType.NONE
|
||||
)
|
||||
)
|
||||
self.assertTrue(
|
||||
permissions.checkPermissions(
|
||||
user, obj, permissions.PermissionType.PERMISSION_READ
|
||||
permissions.hasAccess(
|
||||
user, obj, permissions.PermissionType.READ
|
||||
)
|
||||
)
|
||||
self.assertTrue(
|
||||
permissions.checkPermissions(
|
||||
user, obj, permissions.PermissionType.PERMISSION_ALL
|
||||
permissions.hasAccess(
|
||||
user, obj, permissions.PermissionType.ALL
|
||||
)
|
||||
)
|
||||
|
||||
|
@ -85,5 +85,5 @@ class Accounts(ModelHandler):
|
||||
return ''
|
||||
|
||||
def clear(self, item: Account):
|
||||
self.ensureAccess(item, permissions.PermissionType.PERMISSION_MANAGEMENT)
|
||||
self.ensureAccess(item, permissions.PermissionType.MANAGEMENT)
|
||||
return item.usages.filter(user_service=None).delete()
|
||||
|
@ -86,7 +86,7 @@ class ActorTokens(ModelHandler):
|
||||
raise RequestError('Delete need one and only one argument')
|
||||
|
||||
self.ensureAccess(
|
||||
self.model(), permissions.PermissionType.PERMISSION_ALL, root=True
|
||||
self.model(), permissions.PermissionType.ALL, root=True
|
||||
) # Must have write permissions to delete
|
||||
|
||||
try:
|
||||
|
@ -192,7 +192,7 @@ class Authenticators(ModelHandler):
|
||||
|
||||
# Custom "search" method
|
||||
def search(self, item: Authenticator) -> typing.List[typing.Dict]:
|
||||
self.ensureAccess(item, permissions.PermissionType.PERMISSION_READ)
|
||||
self.ensureAccess(item, permissions.PermissionType.READ)
|
||||
try:
|
||||
type_ = self._params['type']
|
||||
if type_ not in ('user', 'group'):
|
||||
|
@ -274,7 +274,7 @@ class MetaPools(ModelHandler):
|
||||
|
||||
# Set fallback status
|
||||
def setFallbackAccess(self, item: MetaPool):
|
||||
self.ensureAccess(item, permissions.PermissionType.PERMISSION_MANAGEMENT)
|
||||
self.ensureAccess(item, permissions.PermissionType.MANAGEMENT)
|
||||
|
||||
fallback = self._params.get('fallbackAccess')
|
||||
logger.debug('Setting fallback of %s to %s', item.name, fallback)
|
||||
|
@ -246,7 +246,7 @@ class ActionsCalendars(DetailHandler):
|
||||
logger.debug('Launching action')
|
||||
uuid = processUuid(item)
|
||||
calendarAction: CalendarAction = CalendarAction.objects.get(uuid=uuid)
|
||||
self.ensureAccess(calendarAction, permissions.PermissionType.PERMISSION_MANAGEMENT)
|
||||
self.ensureAccess(calendarAction, permissions.PermissionType.MANAGEMENT)
|
||||
logStr = "Launched scheduled action \"{},{},{},{},{}\" by {}".format(
|
||||
calendarAction.calendar.name,
|
||||
calendarAction.action,
|
||||
|
@ -131,11 +131,11 @@ class Permissions(Handler):
|
||||
|
||||
if la == 5 and self._args[3] == 'add':
|
||||
perm: permissions.PermissionType = {
|
||||
'0': permissions.PermissionType.PERMISSION_NONE,
|
||||
'1': permissions.PermissionType.PERMISSION_READ,
|
||||
'2': permissions.PermissionType.PERMISSION_MANAGEMENT,
|
||||
'3': permissions.PermissionType.PERMISSION_ALL,
|
||||
}.get(self._params.get('perm', '0'), permissions.PermissionType.PERMISSION_NONE)
|
||||
'0': permissions.PermissionType.NONE,
|
||||
'1': permissions.PermissionType.READ,
|
||||
'2': permissions.PermissionType.MANAGEMENT,
|
||||
'3': permissions.PermissionType.ALL,
|
||||
}.get(self._params.get('perm', '0'), permissions.PermissionType.NONE)
|
||||
|
||||
cls = Permissions.getClass(self._args[0])
|
||||
|
||||
|
@ -139,7 +139,7 @@ class Providers(ModelHandler):
|
||||
for s in Service.objects.all():
|
||||
try:
|
||||
perm = permissions.getEffectivePermission(self._user, s)
|
||||
if perm >= permissions.PermissionType.PERMISSION_READ:
|
||||
if perm >= permissions.PermissionType.READ:
|
||||
yield DetailServices.serviceToDict(s, perm, True)
|
||||
except Exception:
|
||||
logger.exception('Passed service cause type is unknown')
|
||||
@ -151,7 +151,7 @@ class Providers(ModelHandler):
|
||||
try:
|
||||
service = Service.objects.get(uuid=self._args[1])
|
||||
perm = self.ensureAccess(
|
||||
service.provider, permissions.PermissionType.PERMISSION_READ
|
||||
service.provider, permissions.PermissionType.READ
|
||||
) # Ensures that we can read this item
|
||||
return DetailServices.serviceToDict(service, perm, True)
|
||||
except Exception:
|
||||
@ -163,7 +163,7 @@ class Providers(ModelHandler):
|
||||
Custom method that swaps maintenance mode state for a provider
|
||||
:param item:
|
||||
"""
|
||||
self.ensureAccess(item, permissions.PermissionType.PERMISSION_MANAGEMENT)
|
||||
self.ensureAccess(item, permissions.PermissionType.MANAGEMENT)
|
||||
item.maintenance_mode = not item.maintenance_mode
|
||||
item.save()
|
||||
return self.item_as_dict(item)
|
||||
|
@ -329,7 +329,7 @@ class Services(DetailHandler): # pylint: disable=too-many-public-methods
|
||||
for i in service.deployedServices.all():
|
||||
try:
|
||||
self.ensureAccess(
|
||||
i, permissions.PermissionType.PERMISSION_READ
|
||||
i, permissions.PermissionType.READ
|
||||
) # Ensures access before listing...
|
||||
res.append(
|
||||
{
|
||||
|
@ -644,7 +644,7 @@ class ServicesPools(ModelHandler):
|
||||
|
||||
# Set fallback status
|
||||
def setFallbackAccess(self, item: ServicePool):
|
||||
self.ensureAccess(item, permissions.PermissionType.PERMISSION_MANAGEMENT)
|
||||
self.ensureAccess(item, permissions.PermissionType.MANAGEMENT)
|
||||
|
||||
fallback = self._params.get('fallbackAccess')
|
||||
if fallback != '':
|
||||
|
@ -187,8 +187,8 @@ class System(Handler):
|
||||
if not pool and not self._user.is_admin:
|
||||
raise AccessDenied()
|
||||
# Check permission for pool..
|
||||
if not permissions.checkPermissions(
|
||||
self._user, typing.cast('Model', pool), permissions.PermissionType.PERMISSION_READ
|
||||
if not permissions.hasAccess(
|
||||
self._user, typing.cast('Model', pool), permissions.PermissionType.READ
|
||||
):
|
||||
raise AccessDenied()
|
||||
if self._args[0] == 'stats':
|
||||
|
@ -74,7 +74,7 @@ class TunnelTokens(ModelHandler):
|
||||
raise RequestError('Delete need one and only one argument')
|
||||
|
||||
self.ensureAccess(
|
||||
self.model(), permissions.PermissionType.PERMISSION_ALL, root=True
|
||||
self.model(), permissions.PermissionType.ALL, root=True
|
||||
) # Must have write permissions to delete
|
||||
|
||||
try:
|
||||
|
@ -442,8 +442,8 @@ class Publications(DetailHandler):
|
||||
changeLog = self._params['changelog'] if 'changelog' in self._params else None
|
||||
|
||||
if (
|
||||
permissions.checkPermissions(
|
||||
self._user, parent, permissions.PermissionType.PERMISSION_MANAGEMENT
|
||||
permissions.hasAccess(
|
||||
self._user, parent, permissions.PermissionType.MANAGEMENT
|
||||
)
|
||||
is False
|
||||
):
|
||||
@ -474,8 +474,8 @@ class Publications(DetailHandler):
|
||||
:param uuid: uuid of the publication
|
||||
"""
|
||||
if (
|
||||
permissions.checkPermissions(
|
||||
self._user, parent, permissions.PermissionType.PERMISSION_MANAGEMENT
|
||||
permissions.hasAccess(
|
||||
self._user, parent, permissions.PermissionType.MANAGEMENT
|
||||
)
|
||||
is False
|
||||
):
|
||||
|
@ -248,11 +248,9 @@ class BaseModelHandler(Handler):
|
||||
|
||||
def ensureAccess(
|
||||
self, obj: models.Model, permission: permissions.PermissionType, root: bool = False
|
||||
) -> int:
|
||||
perm = permissions.getEffectivePermission(self._user, obj, root)
|
||||
if perm < permission:
|
||||
) -> None:
|
||||
if not permissions.hasAccess(self._user, obj, permission, root):
|
||||
raise self.accessDenied()
|
||||
return perm
|
||||
|
||||
def typeInfo(self, type_: typing.Type['Module']) -> typing.Dict[str, typing.Any]:
|
||||
"""
|
||||
@ -773,7 +771,7 @@ class ModelHandler(BaseModelHandler):
|
||||
|
||||
# log related
|
||||
def getLogs(self, item: models.Model) -> typing.List[typing.Dict]:
|
||||
self.ensureAccess(item, permissions.PermissionType.PERMISSION_READ)
|
||||
self.ensureAccess(item, permissions.PermissionType.READ)
|
||||
try:
|
||||
return log.getLogs(item)
|
||||
except Exception as e:
|
||||
@ -864,12 +862,12 @@ class ModelHandler(BaseModelHandler):
|
||||
# If we do not have access to parent to, at least, read...
|
||||
|
||||
if self._operation in ('put', 'post', 'delete'):
|
||||
requiredPermission = permissions.PermissionType.PERMISSION_MANAGEMENT
|
||||
requiredPermission = permissions.PermissionType.MANAGEMENT
|
||||
else:
|
||||
requiredPermission = permissions.PermissionType.PERMISSION_READ
|
||||
requiredPermission = permissions.PermissionType.READ
|
||||
|
||||
if (
|
||||
permissions.checkPermissions(self._user, item, requiredPermission)
|
||||
permissions.hasAccess(self._user, item, requiredPermission)
|
||||
is False
|
||||
):
|
||||
logger.debug(
|
||||
@ -928,10 +926,10 @@ class ModelHandler(BaseModelHandler):
|
||||
for item in query:
|
||||
try:
|
||||
if (
|
||||
permissions.checkPermissions(
|
||||
permissions.hasAccess(
|
||||
typing.cast('User', self._user),
|
||||
item,
|
||||
permissions.PermissionType.PERMISSION_READ,
|
||||
permissions.PermissionType.READ,
|
||||
)
|
||||
is False
|
||||
):
|
||||
@ -1010,7 +1008,7 @@ class ModelHandler(BaseModelHandler):
|
||||
try:
|
||||
val = self.model.objects.get(uuid=self._args[0].lower())
|
||||
|
||||
self.ensureAccess(val, permissions.PermissionType.PERMISSION_READ)
|
||||
self.ensureAccess(val, permissions.PermissionType.READ)
|
||||
|
||||
res = self.item_as_dict(val)
|
||||
self.fillIntanceFields(val, res)
|
||||
@ -1067,7 +1065,6 @@ class ModelHandler(BaseModelHandler):
|
||||
Processes a PUT request
|
||||
"""
|
||||
logger.debug('method PUT for %s, %s', self.__class__.__name__, self._args)
|
||||
self._params['_request'] = self._request
|
||||
|
||||
deleteOnError = False
|
||||
|
||||
@ -1076,7 +1073,7 @@ class ModelHandler(BaseModelHandler):
|
||||
|
||||
# Here, self.model() indicates an "django model object with default params"
|
||||
self.ensureAccess(
|
||||
self.model(), permissions.PermissionType.PERMISSION_ALL, root=True
|
||||
self.model(), permissions.PermissionType.ALL, root=True
|
||||
) # Must have write permissions to create, modify, etc..
|
||||
|
||||
try:
|
||||
@ -1172,7 +1169,7 @@ class ModelHandler(BaseModelHandler):
|
||||
raise RequestError('Delete need one and only one argument')
|
||||
|
||||
self.ensureAccess(
|
||||
self.model(), permissions.PermissionType.PERMISSION_ALL, root=True
|
||||
self.model(), permissions.PermissionType.ALL, root=True
|
||||
) # Must have write permissions to delete
|
||||
|
||||
try:
|
||||
|
@ -46,6 +46,7 @@ if typing.TYPE_CHECKING:
|
||||
|
||||
logger = logging.getLogger(__name__)
|
||||
|
||||
|
||||
def clean(obj: 'Model') -> None:
|
||||
models.Permissions.cleanPermissions(objtype.ObjectType.from_model(obj).type, obj.pk)
|
||||
|
||||
@ -63,29 +64,31 @@ def getEffectivePermission(
|
||||
) -> PermissionType:
|
||||
try:
|
||||
if user.is_admin:
|
||||
return PermissionType.PERMISSION_ALL
|
||||
return PermissionType.ALL
|
||||
|
||||
# Just check permissions for staff members
|
||||
# root means for "object type" not for an object
|
||||
if root is False:
|
||||
return models.Permissions.getPermissions(
|
||||
object_type=objtype.ObjectType.from_model(obj),
|
||||
user=user,
|
||||
groups=user.groups.all(),
|
||||
object_type=objtype.ObjectType.from_model(obj).type,
|
||||
object_id=obj.pk,
|
||||
groups=user.groups.all(),
|
||||
)
|
||||
|
||||
return models.Permissions.getPermissions(
|
||||
user=user, groups=user.groups.all(), object_type=objtype.ObjectType.from_model(obj).type
|
||||
object_type=objtype.ObjectType.from_model(obj),
|
||||
user=user,
|
||||
groups=user.groups.all(),
|
||||
)
|
||||
except Exception:
|
||||
return PermissionType.PERMISSION_NONE
|
||||
return PermissionType.NONE
|
||||
|
||||
|
||||
def addUserPermission(
|
||||
user: 'models.User',
|
||||
obj: 'Model',
|
||||
permission: PermissionType = PermissionType.PERMISSION_READ,
|
||||
permission: PermissionType = PermissionType.READ,
|
||||
):
|
||||
# Some permissions added to some object types needs at least READ_PERMISSION on parent
|
||||
models.Permissions.addPermission(
|
||||
@ -99,7 +102,7 @@ def addUserPermission(
|
||||
def addGroupPermission(
|
||||
group: 'models.Group',
|
||||
obj: 'Model',
|
||||
permission: PermissionType = PermissionType.PERMISSION_READ,
|
||||
permission: PermissionType = PermissionType.READ,
|
||||
):
|
||||
models.Permissions.addPermission(
|
||||
group=group,
|
||||
@ -109,13 +112,13 @@ def addGroupPermission(
|
||||
)
|
||||
|
||||
|
||||
def checkPermissions(
|
||||
def hasAccess(
|
||||
user: 'models.User',
|
||||
obj: 'Model',
|
||||
permission: PermissionType = PermissionType.PERMISSION_ALL,
|
||||
permission: PermissionType = PermissionType.ALL,
|
||||
root: bool = False,
|
||||
):
|
||||
return getEffectivePermission(user, obj, root) >= permission
|
||||
return getEffectivePermission(user, obj, root).includes(permission)
|
||||
|
||||
|
||||
def revokePermissionById(permUUID: str) -> None:
|
||||
|
@ -45,22 +45,30 @@ from .util import getSqlDatetime
|
||||
|
||||
logger = logging.getLogger(__name__)
|
||||
|
||||
if typing.TYPE_CHECKING:
|
||||
from uds.models import User, Group
|
||||
from uds.core.util import objtype
|
||||
|
||||
|
||||
class PermissionType(enum.IntEnum):
|
||||
PERMISSION_NONE = 0
|
||||
PERMISSION_READ = 32
|
||||
PERMISSION_MANAGEMENT = 64
|
||||
PERMISSION_ALL = 96
|
||||
NONE = 0
|
||||
READ = 32
|
||||
MANAGEMENT = 64
|
||||
ALL = 96
|
||||
|
||||
def as_str(self) -> str:
|
||||
"""Returns the permission as a string"""
|
||||
return {
|
||||
PermissionType.PERMISSION_NONE: _('None'),
|
||||
PermissionType.PERMISSION_READ: _('Read'),
|
||||
PermissionType.PERMISSION_MANAGEMENT: _('Manage'),
|
||||
PermissionType.PERMISSION_ALL: _('All'),
|
||||
PermissionType.NONE: _('None'),
|
||||
PermissionType.READ: _('Read'),
|
||||
PermissionType.MANAGEMENT: _('Manage'),
|
||||
PermissionType.ALL: _('All'),
|
||||
}.get(self, _('None'))
|
||||
|
||||
def includes(self, permission: 'PermissionType') -> bool:
|
||||
"""Returns if the permission includes the given permission"""
|
||||
return self.value >= permission.value
|
||||
|
||||
|
||||
class Permissions(UUIDModel):
|
||||
"""
|
||||
@ -95,7 +103,7 @@ class Permissions(UUIDModel):
|
||||
object_id = models.IntegerField(default=None, db_index=True, null=True, blank=True)
|
||||
|
||||
permission = models.SmallIntegerField(
|
||||
default=PermissionType.PERMISSION_NONE, db_index=True
|
||||
default=PermissionType.NONE, db_index=True
|
||||
)
|
||||
|
||||
# "fake" declarations for type checking
|
||||
@ -122,7 +130,7 @@ class Permissions(UUIDModel):
|
||||
|
||||
object_id = kwargs.get('object_id', None)
|
||||
|
||||
permission = kwargs.get('permission', PermissionType.PERMISSION_NONE)
|
||||
permission = kwargs.get('permission', PermissionType.NONE)
|
||||
|
||||
if user is not None:
|
||||
q = Q(user=user)
|
||||
@ -150,7 +158,12 @@ class Permissions(UUIDModel):
|
||||
)
|
||||
|
||||
@staticmethod
|
||||
def getPermissions(**kwargs) -> PermissionType:
|
||||
def getPermissions(
|
||||
object_type: 'objtype.ObjectType',
|
||||
object_id: typing.Optional[int] = None,
|
||||
user: typing.Optional['User'] = None,
|
||||
groups: typing.Optional[typing.Iterable['Group']] = None,
|
||||
) -> PermissionType:
|
||||
"""
|
||||
Retrieves the permission for a given object
|
||||
It's mandatory to include at least object_type param
|
||||
@ -160,23 +173,16 @@ class Permissions(UUIDModel):
|
||||
@param user: Optional, User (db object)
|
||||
@param groups: Optional List of db groups
|
||||
"""
|
||||
object_type = kwargs.get('object_type', None)
|
||||
if object_type is None:
|
||||
raise Exception('Needs at least the object_type field')
|
||||
|
||||
object_id = kwargs.get('object_id', None)
|
||||
|
||||
user = kwargs.get('user', None)
|
||||
groups = kwargs.get('groups', [])
|
||||
|
||||
if user is None and not groups:
|
||||
if not user and not groups:
|
||||
q = Q()
|
||||
else:
|
||||
q = Q(user=user) | Q(group__in=groups)
|
||||
q = Q(user=user)
|
||||
if groups:
|
||||
q |= Q(group__in=groups)
|
||||
|
||||
try:
|
||||
perm: Permissions = Permissions.objects.filter(
|
||||
Q(object_type=object_type),
|
||||
Q(object_type=object_type.type),
|
||||
Q(object_id=None) | Q(object_id=object_id),
|
||||
q,
|
||||
).order_by('-permission')[
|
||||
@ -185,7 +191,7 @@ class Permissions(UUIDModel):
|
||||
logger.debug('Got permission %s', perm)
|
||||
return PermissionType(perm.permission)
|
||||
except Exception: # DoesNotExists
|
||||
return PermissionType.PERMISSION_NONE
|
||||
return PermissionType.NONE
|
||||
|
||||
@staticmethod
|
||||
def enumeratePermissions(object_type, object_id) -> 'models.QuerySet[Permissions]':
|
||||
|
Loading…
x
Reference in New Issue
Block a user