1
0
mirror of https://github.com/dkmstr/openuds.git synced 2025-01-11 05:17:55 +03:00

Refactirizing permissions and objtype

This commit is contained in:
Adolfo Gómez García 2022-12-13 12:48:40 +01:00
parent e37a3cef2e
commit 84c7836e8c
No known key found for this signature in database
GPG Key ID: DD1ABF20724CDA23
23 changed files with 183 additions and 162 deletions

View File

@ -34,7 +34,7 @@
import typing
from uds.core.util import permissions
from uds.core.util import ot
from uds.core.util import objtype
from uds import models
from ...utils.test import UDSTestCase
@ -83,67 +83,67 @@ class PermissionsTest(UDSTestCase):
self.network = network_fixtures.createNetwork()
def doTestUserPermissions(self, obj, user: models.User):
permissions.addUserPermission(user, obj, permissions.PERMISSION_NONE)
permissions.addUserPermission(user, obj, permissions.PermissionType.PERMISSION_NONE)
self.assertEqual(models.Permissions.objects.count(), 1)
perm = models.Permissions.objects.all()[0]
self.assertEqual(perm.object_type, ot.getObjectType(obj))
self.assertEqual(perm.object_type, objtype.getObjectType(obj))
self.assertEqual(perm.object_id, obj.pk)
self.assertEqual(perm.permission, permissions.PERMISSION_NONE)
self.assertEqual(perm.permission, permissions.PermissionType.PERMISSION_NONE)
self.assertTrue(
permissions.checkPermissions(user, obj, permissions.PERMISSION_NONE)
permissions.checkPermissions(user, obj, permissions.PermissionType.PERMISSION_NONE)
)
self.assertEqual(
permissions.checkPermissions(user, obj, permissions.PERMISSION_READ),
permissions.checkPermissions(user, obj, permissions.PermissionType.PERMISSION_READ),
user.is_admin,
)
self.assertEqual(
permissions.checkPermissions(user, obj, permissions.PERMISSION_MANAGEMENT),
permissions.checkPermissions(user, obj, permissions.PermissionType.PERMISSION_MANAGEMENT),
user.is_admin,
)
self.assertEqual(
permissions.checkPermissions(user, obj, permissions.PERMISSION_ALL),
permissions.checkPermissions(user, obj, permissions.PermissionType.PERMISSION_ALL),
user.is_admin,
)
# Add a new permission, must overwrite the previous one
permissions.addUserPermission(user, obj, permissions.PERMISSION_ALL)
permissions.addUserPermission(user, obj, permissions.PermissionType.PERMISSION_ALL)
self.assertEqual(models.Permissions.objects.count(), 1)
perm = models.Permissions.objects.all()[0]
self.assertEqual(perm.object_type, ot.getObjectType(obj))
self.assertEqual(perm.object_type, objtype.getObjectType(obj))
self.assertEqual(perm.object_id, obj.pk)
self.assertEqual(perm.permission, permissions.PERMISSION_ALL)
self.assertEqual(perm.permission, permissions.PermissionType.PERMISSION_ALL)
self.assertTrue(
permissions.checkPermissions(user, obj, permissions.PERMISSION_NONE)
permissions.checkPermissions(user, obj, permissions.PermissionType.PERMISSION_NONE)
)
self.assertTrue(
permissions.checkPermissions(user, obj, permissions.PERMISSION_READ)
permissions.checkPermissions(user, obj, permissions.PermissionType.PERMISSION_READ)
)
self.assertTrue(
permissions.checkPermissions(user, obj, permissions.PERMISSION_MANAGEMENT)
permissions.checkPermissions(user, obj, permissions.PermissionType.PERMISSION_MANAGEMENT)
)
self.assertTrue(
permissions.checkPermissions(user, obj, permissions.PERMISSION_ALL)
permissions.checkPermissions(user, obj, permissions.PermissionType.PERMISSION_ALL)
)
# Again, with read
permissions.addUserPermission(user, obj, permissions.PERMISSION_READ)
permissions.addUserPermission(user, obj, permissions.PermissionType.PERMISSION_READ)
self.assertEqual(models.Permissions.objects.count(), 1)
perm = models.Permissions.objects.all()[0]
self.assertEqual(perm.object_type, ot.getObjectType(obj))
self.assertEqual(perm.object_type, objtype.getObjectType(obj))
self.assertEqual(perm.object_id, obj.pk)
self.assertEqual(perm.permission, permissions.PERMISSION_READ)
self.assertEqual(perm.permission, permissions.PermissionType.PERMISSION_READ)
self.assertTrue(
permissions.checkPermissions(user, obj, permissions.PERMISSION_NONE)
permissions.checkPermissions(user, obj, permissions.PermissionType.PERMISSION_NONE)
)
self.assertTrue(
permissions.checkPermissions(user, obj, permissions.PERMISSION_READ)
permissions.checkPermissions(user, obj, permissions.PermissionType.PERMISSION_READ)
)
self.assertEqual(
permissions.checkPermissions(user, obj, permissions.PERMISSION_MANAGEMENT),
permissions.checkPermissions(user, obj, permissions.PermissionType.PERMISSION_MANAGEMENT),
user.is_admin,
)
self.assertEqual(
permissions.checkPermissions(user, obj, permissions.PERMISSION_ALL),
permissions.checkPermissions(user, obj, permissions.PermissionType.PERMISSION_ALL),
user.is_admin,
)
@ -154,57 +154,57 @@ class PermissionsTest(UDSTestCase):
def doTestGroupPermissions(self, obj, user: models.User):
group = user.groups.all()[0]
permissions.addGroupPermission(group, obj, permissions.PERMISSION_NONE)
permissions.addGroupPermission(group, obj, permissions.PermissionType.PERMISSION_NONE)
self.assertEqual(models.Permissions.objects.count(), 1)
perm = models.Permissions.objects.all()[0]
self.assertEqual(perm.object_type, ot.getObjectType(obj))
self.assertEqual(perm.object_type, objtype.getObjectType(obj))
self.assertEqual(perm.object_id, obj.pk)
self.assertEqual(perm.permission, permissions.PERMISSION_NONE)
self.assertEqual(perm.permission, permissions.PermissionType.PERMISSION_NONE)
self.assertTrue(
permissions.checkPermissions(user, obj, permissions.PERMISSION_NONE)
permissions.checkPermissions(user, obj, permissions.PermissionType.PERMISSION_NONE)
)
# Admins has all permissions ALWAYS
self.assertEqual(
permissions.checkPermissions(user, obj, permissions.PERMISSION_READ),
permissions.checkPermissions(user, obj, permissions.PermissionType.PERMISSION_READ),
user.is_admin,
)
self.assertEqual(
permissions.checkPermissions(user, obj, permissions.PERMISSION_ALL),
permissions.checkPermissions(user, obj, permissions.PermissionType.PERMISSION_ALL),
user.is_admin,
)
permissions.addGroupPermission(group, obj, permissions.PERMISSION_ALL)
permissions.addGroupPermission(group, obj, permissions.PermissionType.PERMISSION_ALL)
self.assertEqual(models.Permissions.objects.count(), 1)
perm = models.Permissions.objects.all()[0]
self.assertEqual(perm.object_type, ot.getObjectType(obj))
self.assertEqual(perm.object_type, objtype.getObjectType(obj))
self.assertEqual(perm.object_id, obj.pk)
self.assertEqual(perm.permission, permissions.PERMISSION_ALL)
self.assertEqual(perm.permission, permissions.PermissionType.PERMISSION_ALL)
self.assertTrue(
permissions.checkPermissions(user, obj, permissions.PERMISSION_NONE)
permissions.checkPermissions(user, obj, permissions.PermissionType.PERMISSION_NONE)
)
self.assertTrue(
permissions.checkPermissions(user, obj, permissions.PERMISSION_READ)
permissions.checkPermissions(user, obj, permissions.PermissionType.PERMISSION_READ)
)
self.assertTrue(
permissions.checkPermissions(user, obj, permissions.PERMISSION_ALL)
permissions.checkPermissions(user, obj, permissions.PermissionType.PERMISSION_ALL)
)
# Add user permission, DB must contain both an return ALL
permissions.addUserPermission(user, obj, permissions.PERMISSION_READ)
permissions.addUserPermission(user, obj, permissions.PermissionType.PERMISSION_READ)
self.assertEqual(models.Permissions.objects.count(), 2)
perm = models.Permissions.objects.all()[0]
self.assertEqual(perm.object_type, ot.getObjectType(obj))
self.assertEqual(perm.object_type, objtype.getObjectType(obj))
self.assertEqual(perm.object_id, obj.pk)
self.assertEqual(perm.permission, permissions.PERMISSION_ALL)
self.assertEqual(perm.permission, permissions.PermissionType.PERMISSION_ALL)
self.assertTrue(
permissions.checkPermissions(user, obj, permissions.PERMISSION_NONE)
permissions.checkPermissions(user, obj, permissions.PermissionType.PERMISSION_NONE)
)
self.assertTrue(
permissions.checkPermissions(user, obj, permissions.PERMISSION_READ)
permissions.checkPermissions(user, obj, permissions.PermissionType.PERMISSION_READ)
)
self.assertTrue(
permissions.checkPermissions(user, obj, permissions.PERMISSION_ALL)
permissions.checkPermissions(user, obj, permissions.PermissionType.PERMISSION_ALL)
)
# Remove obj, permissions must have gone away

View File

@ -114,7 +114,7 @@ class RESTTestCase(test.UDSTransactionTestCase):
username=user.name,
password=user.name,
)
self.assertEqual(response['result'], 'ok', 'Login failed')
self.assertEqual(response['result'], 'ok', f'Login failed: {response}')
# Insert token into headers
self.client.add_header(AUTH_TOKEN_HEADER, response['token'])
return response['token']

View File

@ -85,5 +85,5 @@ class Accounts(ModelHandler):
return ''
def clear(self, item: Account):
self.ensureAccess(item, permissions.PERMISSION_MANAGEMENT)
self.ensureAccess(item, permissions.PermissionType.PERMISSION_MANAGEMENT)
return item.usages.filter(user_service=None).delete()

View File

@ -86,7 +86,7 @@ class ActorTokens(ModelHandler):
raise RequestError('Delete need one and only one argument')
self.ensureAccess(
self.model(), permissions.PERMISSION_ALL, root=True
self.model(), permissions.PermissionType.PERMISSION_ALL, root=True
) # Must have write permissions to delete
try:

View File

@ -191,7 +191,7 @@ class Authenticators(ModelHandler):
# Custom "search" method
def search(self, item: Authenticator) -> typing.List[typing.Dict]:
self.ensureAccess(item, permissions.PERMISSION_READ)
self.ensureAccess(item, permissions.PermissionType.PERMISSION_READ)
try:
type_ = self._params['type']
if type_ not in ('user', 'group'):

View File

@ -145,7 +145,7 @@ class Login(Handler):
'authSmallName', None
)
authName: typing.Optional[str] = self._params.get('auth', None)
platform: str = self._params.get('platform', self._request.os)
platform: str = self._params.get('platform', self._request.os.os.value[0])
username: str
password: str

View File

@ -274,7 +274,7 @@ class MetaPools(ModelHandler):
# Set fallback status
def setFallbackAccess(self, item: MetaPool):
self.ensureAccess(item, permissions.PERMISSION_MANAGEMENT)
self.ensureAccess(item, permissions.PermissionType.PERMISSION_MANAGEMENT)
fallback = self._params.get('fallbackAccess')
logger.debug('Setting fallback of %s to %s', item.name, fallback)

View File

@ -246,7 +246,7 @@ class ActionsCalendars(DetailHandler):
logger.debug('Launching action')
uuid = processUuid(item)
calendarAction: CalendarAction = CalendarAction.objects.get(uuid=uuid)
self.ensureAccess(calendarAction, permissions.PERMISSION_MANAGEMENT)
self.ensureAccess(calendarAction, permissions.PermissionType.PERMISSION_MANAGEMENT)
logStr = "Launched scheduled action \"{},{},{},{},{}\" by {}".format(
calendarAction.calendar.name,
calendarAction.action,

View File

@ -88,6 +88,10 @@ class Permissions(Handler):
kind = 'user'
entity = perm.user
# If entity is None, it means that the permission is not valid anymore (user or group deleted on db manually?)
if not entity:
continue
res.append(
{
'id': perm.uuid,
@ -126,12 +130,12 @@ class Permissions(Handler):
la = len(self._args)
if la == 5 and self._args[3] == 'add':
perm: int = {
'0': permissions.PERMISSION_NONE,
'1': permissions.PERMISSION_READ,
'2': permissions.PERMISSION_MANAGEMENT,
'3': permissions.PERMISSION_ALL,
}.get(self._params.get('perm', '0'), permissions.PERMISSION_NONE)
perm: permissions.PermissionType = {
'0': permissions.PermissionType.PERMISSION_NONE,
'1': permissions.PermissionType.PERMISSION_READ,
'2': permissions.PermissionType.PERMISSION_MANAGEMENT,
'3': permissions.PermissionType.PERMISSION_ALL,
}.get(self._params.get('perm', '0'), permissions.PermissionType.PERMISSION_NONE)
cls = Permissions.getClass(self._args[0])

View File

@ -139,7 +139,7 @@ class Providers(ModelHandler):
for s in Service.objects.all():
try:
perm = permissions.getEffectivePermission(self._user, s)
if perm >= permissions.PERMISSION_READ:
if perm >= permissions.PermissionType.PERMISSION_READ:
yield DetailServices.serviceToDict(s, perm, True)
except Exception:
logger.exception('Passed service cause type is unknown')
@ -151,7 +151,7 @@ class Providers(ModelHandler):
try:
service = Service.objects.get(uuid=self._args[1])
perm = self.ensureAccess(
service.provider, permissions.PERMISSION_READ
service.provider, permissions.PermissionType.PERMISSION_READ
) # Ensures that we can read this item
return DetailServices.serviceToDict(service, perm, True)
except Exception:
@ -163,7 +163,7 @@ class Providers(ModelHandler):
Custom method that swaps maintenance mode state for a provider
:param item:
"""
self.ensureAccess(item, permissions.PERMISSION_MANAGEMENT)
self.ensureAccess(item, permissions.PermissionType.PERMISSION_MANAGEMENT)
item.maintenance_mode = not item.maintenance_mode
item.save()
return self.item_as_dict(item)

View File

@ -329,7 +329,7 @@ class Services(DetailHandler): # pylint: disable=too-many-public-methods
for i in service.deployedServices.all():
try:
self.ensureAccess(
i, permissions.PERMISSION_READ
i, permissions.PermissionType.PERMISSION_READ
) # Ensures access before listing...
res.append(
{

View File

@ -644,7 +644,7 @@ class ServicesPools(ModelHandler):
# Set fallback status
def setFallbackAccess(self, item: ServicePool):
self.ensureAccess(item, permissions.PERMISSION_MANAGEMENT)
self.ensureAccess(item, permissions.PermissionType.PERMISSION_MANAGEMENT)
fallback = self._params.get('fallbackAccess')
if fallback != '':

View File

@ -188,7 +188,7 @@ class System(Handler):
raise AccessDenied()
# Check permission for pool..
if not permissions.checkPermissions(
self._user, typing.cast('Model', pool), permissions.PERMISSION_READ
self._user, typing.cast('Model', pool), permissions.PermissionType.PERMISSION_READ
):
raise AccessDenied()
if self._args[0] == 'stats':

View File

@ -168,7 +168,7 @@ class Tickets(Handler):
) # Some machines needs password, depending on configuration
groupIds: typing.List[str] = []
for groupName in tools.asList(self._params['groups']):
for groupName in tools.as_list(self._params['groups']):
try:
groupIds.append(auth.groups.get(name=groupName).uuid)
except Exception:
@ -225,7 +225,6 @@ class Tickets(Handler):
# For metapool, transport is ignored..
servicePoolId = 'M' + pool.uuid
transportId = 'meta'
except models.MetaPool.DoesNotExist:
pool = typing.cast(
@ -244,9 +243,9 @@ class Tickets(Handler):
except models.Authenticator.DoesNotExist:
return Tickets.result(error='Authenticator does not exists')
except models.ServicePool.DoesNotExist:
except models.ServicePool.DoesNotExist: # type: ignore # this is fine, is not the same as models.Authenticator.DoesNotExist
return Tickets.result(error='Service pool (or metapool) does not exists')
except models.Transport.DoesNotExist:
except models.Transport.DoesNotExist: # type: ignore # this is fine, is not the same as models.Authenticator.DoesNotExist
return Tickets.result(error='Transport does not exists')
except Exception as e:
return Tickets.result(error=str(e))

View File

@ -74,7 +74,7 @@ class TunnelTokens(ModelHandler):
raise RequestError('Delete need one and only one argument')
self.ensureAccess(
self.model(), permissions.PERMISSION_ALL, root=True
self.model(), permissions.PermissionType.PERMISSION_ALL, root=True
) # Must have write permissions to delete
try:

View File

@ -443,7 +443,7 @@ class Publications(DetailHandler):
if (
permissions.checkPermissions(
self._user, parent, permissions.PERMISSION_MANAGEMENT
self._user, parent, permissions.PermissionType.PERMISSION_MANAGEMENT
)
is False
):
@ -475,7 +475,7 @@ class Publications(DetailHandler):
"""
if (
permissions.checkPermissions(
self._user, parent, permissions.PERMISSION_MANAGEMENT
self._user, parent, permissions.PermissionType.PERMISSION_MANAGEMENT
)
is False
):

View File

@ -247,7 +247,7 @@ class BaseModelHandler(Handler):
return gui
def ensureAccess(
self, obj: models.Model, permission: int, root: bool = False
self, obj: models.Model, permission: permissions.PermissionType, root: bool = False
) -> int:
perm = permissions.getEffectivePermission(self._user, obj, root)
if perm < permission:
@ -773,7 +773,7 @@ class ModelHandler(BaseModelHandler):
# log related
def getLogs(self, item: models.Model) -> typing.List[typing.Dict]:
self.ensureAccess(item, permissions.PERMISSION_READ)
self.ensureAccess(item, permissions.PermissionType.PERMISSION_READ)
try:
return log.getLogs(item)
except Exception as e:
@ -864,9 +864,9 @@ class ModelHandler(BaseModelHandler):
# If we do not have access to parent to, at least, read...
if self._operation in ('put', 'post', 'delete'):
requiredPermission = permissions.PERMISSION_MANAGEMENT
requiredPermission = permissions.PermissionType.PERMISSION_MANAGEMENT
else:
requiredPermission = permissions.PERMISSION_READ
requiredPermission = permissions.PermissionType.PERMISSION_READ
if (
permissions.checkPermissions(self._user, item, requiredPermission)
@ -931,7 +931,7 @@ class ModelHandler(BaseModelHandler):
permissions.checkPermissions(
typing.cast('User', self._user),
item,
permissions.PERMISSION_READ,
permissions.PermissionType.PERMISSION_READ,
)
is False
):
@ -1010,7 +1010,7 @@ class ModelHandler(BaseModelHandler):
try:
val = self.model.objects.get(uuid=self._args[0].lower())
self.ensureAccess(val, permissions.PERMISSION_READ)
self.ensureAccess(val, permissions.PermissionType.PERMISSION_READ)
res = self.item_as_dict(val)
self.fillIntanceFields(val, res)
@ -1076,7 +1076,7 @@ class ModelHandler(BaseModelHandler):
# Here, self.model() indicates an "django model object with default params"
self.ensureAccess(
self.model(), permissions.PERMISSION_ALL, root=True
self.model(), permissions.PermissionType.PERMISSION_ALL, root=True
) # Must have write permissions to create, modify, etc..
try:
@ -1172,7 +1172,7 @@ class ModelHandler(BaseModelHandler):
raise RequestError('Delete need one and only one argument')
self.ensureAccess(
self.model(), permissions.PERMISSION_ALL, root=True
self.model(), permissions.PermissionType.PERMISSION_ALL, root=True
) # Must have write permissions to delete
try:

View File

@ -53,6 +53,11 @@ if typing.TYPE_CHECKING:
def loadModulesUrls() -> typing.List[typing.Any]:
"""Loads dipatcher modules urls to add to django urlpatterns
Returns:
typing.List[typing.Any]: List of urlpatterns to add to django urlpatterns
"""
logger.debug('Looking for dispatching modules')
if not patterns:
logger.debug('Looking for patterns')
@ -69,7 +74,7 @@ def loadModulesUrls() -> typing.List[typing.Any]:
for up in urlpatterns:
patterns.append(up)
except Exception:
logger.exception('Loading patterns')
logger.error('No patterns found in %s', fullModName)
except Exception:
logger.exception('Processing dispatchers loading')
@ -79,9 +84,19 @@ def loadModulesUrls() -> typing.List[typing.Any]:
def importModules(modName: str, *, packageName: typing.Optional[str] = None) -> None:
"""Dinamycally import children of package
Args:
modName (str): Name of the module to import
packageName (str, optional): Name of the package inside the module to import. Defaults to None. If None, the module itself is imported
Notes:
This function is used to dinamycally import all submodules inside a submodule (with optional package name).
"""
# Dinamycally import children of this package.
pkgpath = os.path.dirname(typing.cast(str, sys.modules[modName].__file__))
if packageName:
if packageName: # Append package name to path and module name
pkgpath = os.path.join(pkgpath, packageName)
modName = '{}.{}'.format(modName, packageName)
@ -102,13 +117,24 @@ def dynamicLoadAndRegisterPackages(
packageName: typing.Optional[str] = None,
checker: typing.Optional[typing.Callable[[typing.Type[V]], bool]] = None,
) -> None:
''' Loads all packages from a given package that are subclasses of the given type
Args:
adder (typing.Callable[[typing.Type[V]], None]): Function to use to add the objects, must support "insert" method
type_ (typing.Type[V]): Type of the objects to load
modName (str): Name of the package to load
packageName (str, optional): Name of the package inside the module to import. Defaults to None. If None, the module itself is imported
checker (typing.Callable[[typing.Type[V]], bool], optional): Function to use to check if the class is registrable. Defaults to None.
Notes:
The checker function must return True if the class is registrable, False otherwise.
Example:
def checker(cls: MyBaseclass) -> bool:
# Will receive all classes that are subclasses of MyBaseclass
return cls.__name__.startswith('MyClass')
'''
Loads all packages from a given package that are subclasses of the given type
param adder: Function to use to add the objects, must support "insert" method
param type_: Type of the objects to load
param modName: Name of the package to load
param checker: Function to use to check if the class is registrable
'''
# Ensures all modules under modName (and optionally packageName) are imported
importModules(modName, packageName=packageName)
checkFnc = checker or (lambda x: True)
@ -119,10 +145,10 @@ def dynamicLoadAndRegisterPackages(
clsSubCls = cls.__subclasses__()
if clsSubCls:
process(clsSubCls)
process(clsSubCls) # recursive add sub classes
if not checkFnc(cls):
logger.debug('Node is a base, skipping: %s', cls.__module__)
logger.debug('Node is a not accepted, skipping: %s.%s', cls.__module__, cls.__name__)
continue
logger.info(' - Registering %s.%s', cls.__module__, cls.__name__)
@ -138,11 +164,14 @@ def dynamicLoadAndRegisterModules(
type_: typing.Type[T],
modName: str,
) -> None:
'''
Loads all modules from a given package that are subclasses of the given type
param factory: Factory to use to create the objects, must support "insert" method
param type_: Type of the objects to load
param modName: Name of the package to load
''' Loads and registers all modules from a given package that are subclasses of the given type
This is an specialisation of dynamicLoadAndRegisterPackages that uses a ModuleFactory to register the modules
Args:
factory (ModuleFactory): Factory to use to create the objects, must support "insert" method
type_ (typing.Type[T]): Type of the objects to load
modName (str): Name of the package to load
'''
dynamicLoadAndRegisterPackages(
factory.insert, type_, modName, checker=lambda x: not x.isBase

View File

@ -35,8 +35,6 @@ import re
import logging
import typing
from .tools import DictAsObj
logger = logging.getLogger(__name__)

View File

@ -32,9 +32,14 @@
"""
import logging
import typing
import enum
from uds import models
from uds.core.util import ot
from uds.models.permissions import PermissionType
from uds.core.util import objtype
from django.utils.translation import gettext as _
# Not imported at runtime, just for type checking
if typing.TYPE_CHECKING:
@ -42,31 +47,24 @@ if typing.TYPE_CHECKING:
logger = logging.getLogger(__name__)
# Import names into namespace
PERMISSION_ALL = models.Permissions.PERMISSION_ALL
PERMISSION_READ = models.Permissions.PERMISSION_READ
PERMISSION_MANAGEMENT = models.Permissions.PERMISSION_MANAGEMENT
PERMISSION_NONE = models.Permissions.PERMISSION_NONE
def clean(obj: 'Model') -> None:
models.Permissions.cleanPermissions(ot.getObjectType(obj), obj.pk)
models.Permissions.cleanPermissions(objtype.getObjectType(obj), obj.pk)
def getPermissions(obj: 'Model') -> typing.List[models.Permissions]:
return list(
models.Permissions.enumeratePermissions(
object_type=ot.getObjectType(obj), object_id=obj.pk
object_type=objtype.getObjectType(obj), object_id=obj.pk
)
)
def getEffectivePermission(
user: 'models.User', obj: 'Model', root: bool = False
) -> int:
) -> PermissionType:
try:
if user.is_admin:
return PERMISSION_ALL
return PermissionType.PERMISSION_ALL
# Just check permissions for staff members
# root means for "object type" not for an object
@ -74,35 +72,39 @@ def getEffectivePermission(
return models.Permissions.getPermissions(
user=user,
groups=user.groups.all(),
object_type=ot.getObjectType(obj),
object_type=objtype.getObjectType(obj),
object_id=obj.pk,
)
return models.Permissions.getPermissions(
user=user, groups=user.groups.all(), object_type=ot.getObjectType(obj)
user=user, groups=user.groups.all(), object_type=objtype.getObjectType(obj)
)
except Exception:
return PERMISSION_NONE
return PermissionType.PERMISSION_NONE
def addUserPermission(
user: 'models.User', obj: 'Model', permission: int = PERMISSION_READ
user: 'models.User',
obj: 'Model',
permission: PermissionType = PermissionType.PERMISSION_READ,
):
# Some permissions added to some object types needs at least READ_PERMISSION on parent
models.Permissions.addPermission(
user=user,
object_type=ot.getObjectType(obj),
object_type=objtype.getObjectType(obj),
object_id=obj.pk,
permission=permission,
)
def addGroupPermission(
group: 'models.Group', obj: 'Model', permission: int = PERMISSION_READ
group: 'models.Group',
obj: 'Model',
permission: PermissionType = PermissionType.PERMISSION_READ,
):
models.Permissions.addPermission(
group=group,
object_type=ot.getObjectType(obj),
object_type=objtype.getObjectType(obj),
object_id=obj.pk,
permission=permission,
)
@ -111,16 +113,12 @@ def addGroupPermission(
def checkPermissions(
user: 'models.User',
obj: 'Model',
permission: int = PERMISSION_ALL,
permission: PermissionType = PermissionType.PERMISSION_ALL,
root: bool = False,
):
return getEffectivePermission(user, obj, root) >= permission
def getPermissionName(perm: int) -> str:
return models.Permissions.permissionAsString(perm)
def revokePermissionById(permUUID: str) -> None:
"""Revokes a permision by its uuid
@ -131,4 +129,5 @@ def revokePermissionById(permUUID: str) -> None:
try:
models.Permissions.objects.get(uuid=permUUID).delete()
except Exception:
pass
# no pemission found, log it
logger.warning('Permission %s not found', permUUID)

View File

@ -28,7 +28,7 @@
# OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
"""
@author: Adolfo Gómez, dkmaster at dkmon dot com
Author: Adolfo Gómez, dkmaster at dkmon dot com
"""
import sys
import os
@ -44,27 +44,6 @@ import django.template.defaultfilters as filters
from uds.core import services
class DictAsObj(dict):
"""
Returns a mix between a dict and an obj
Can be accesses as .xxxx or ['xxx']
"""
def __init__(
self, dct: typing.Optional[typing.Dict[str, typing.Any]] = None, **kwargs
):
if dct:
self.__dict__.update(dct)
self.__dict__.update(kwargs)
def __getitem__(self, key):
return self.__dict__[key]
def __unicode__(self):
return ', '.join('{}={}'.format(v, self.__dict__[v]) for v in self.__dict__)
# pylint: disable=protected-access
class CaseInsensitiveDict(dict):
@classmethod
def _k(cls, key):
@ -113,9 +92,15 @@ class CaseInsensitiveDict(dict):
self.__setitem__(k, v)
def asList(value: typing.Any) -> typing.List[typing.Any]:
if isinstance(value, list):
return value
def as_list(value: typing.Any) -> typing.List[typing.Any]:
"""If value is not a list, returns a list with value as only element
Args:
value (typing.Any): Value to convert to list
Returns:
typing.List[typing.Any]: List with value as only element
"""
if isinstance(value, (bytes, str, int, float)):
return [value]
try:
@ -190,6 +175,7 @@ def checkValidBasename(baseName: str, length: int = -1) -> None:
gettext('The machine name can\'t be only numbers')
)
def removeControlCharacters(s: str) -> str:
"""
Removes control characters from an unicode string

View File

@ -30,6 +30,8 @@
"""
.. moduleauthor:: Adolfo Gómez, dkmaster at dkmon dot com
"""
import typing
import enum
import logging
from django.utils.translation import gettext as _
@ -41,20 +43,31 @@ from .user import User
from .group import Group
from .util import getSqlDatetime
logger = logging.getLogger(__name__)
class PermissionType(enum.IntEnum):
PERMISSION_NONE = 0
PERMISSION_READ = 32
PERMISSION_MANAGEMENT = 64
PERMISSION_ALL = 96
def as_str(self) -> str:
"""Returns the permission as a string"""
return {
PermissionType.PERMISSION_NONE: _('None'),
PermissionType.PERMISSION_READ: _('Read'),
PermissionType.PERMISSION_MANAGEMENT: _('Manage'),
PermissionType.PERMISSION_ALL: _('All'),
}.get(self, _('None'))
class Permissions(UUIDModel):
"""
An OS Manager represents a manager for responding requests for agents inside services.
"""
# Allowed permissions
PERMISSION_NONE = 0
PERMISSION_READ = 32
PERMISSION_MANAGEMENT = 64
PERMISSION_ALL = 96
created = models.DateTimeField(db_index=True)
ends = models.DateTimeField(
@ -81,20 +94,13 @@ class Permissions(UUIDModel):
object_type = models.SmallIntegerField(default=-1, db_index=True)
object_id = models.IntegerField(default=None, db_index=True, null=True, blank=True)
permission = models.SmallIntegerField(default=PERMISSION_NONE, db_index=True)
permission = models.SmallIntegerField(
default=PermissionType.PERMISSION_NONE, db_index=True
)
# "fake" declarations for type checking
# objects: 'models.manager.Manager[Permissions]'
@staticmethod
def permissionAsString(perm: int) -> str:
return {
Permissions.PERMISSION_NONE: _('None'),
Permissions.PERMISSION_READ: _('Read'),
Permissions.PERMISSION_MANAGEMENT: _('Manage'),
Permissions.PERMISSION_ALL: _('All'),
}.get(perm, _('None'))
@staticmethod
def addPermission(**kwargs) -> 'Permissions':
"""
@ -116,7 +122,7 @@ class Permissions(UUIDModel):
object_id = kwargs.get('object_id', None)
permission = kwargs.get('permission', Permissions.PERMISSION_NONE)
permission = kwargs.get('permission', PermissionType.PERMISSION_NONE)
if user is not None:
q = Q(user=user)
@ -144,7 +150,7 @@ class Permissions(UUIDModel):
)
@staticmethod
def getPermissions(**kwargs) -> int:
def getPermissions(**kwargs) -> PermissionType:
"""
Retrieves the permission for a given object
It's mandatory to include at least object_type param
@ -177,9 +183,9 @@ class Permissions(UUIDModel):
0 # type: ignore # Slicing is not supported by pylance right now
]
logger.debug('Got permission %s', perm)
return perm.permission
return PermissionType(perm.permission)
except Exception: # DoesNotExists
return Permissions.PERMISSION_NONE
return PermissionType.PERMISSION_NONE
@staticmethod
def enumeratePermissions(object_type, object_id) -> 'models.QuerySet[Permissions]':
@ -204,7 +210,7 @@ class Permissions(UUIDModel):
@property
def permission_as_string(self) -> str:
return Permissions.permissionAsString(self.permission)
return PermissionType(self.permission).as_str()
def __str__(self) -> str:
return 'Permission {}, user {} group {} object_type {} object_id {} permission {}'.format(
@ -213,5 +219,5 @@ class Permissions(UUIDModel):
self.group,
self.object_type,
self.object_id,
Permissions.permissionAsString(self.permission),
PermissionType(self.permission).as_str(),
)