1
0
mirror of https://github.com/dkmstr/openuds.git synced 2024-12-23 17:34:17 +03:00

Adder utility "matcher" to understand better arguments on rest calls

This commit is contained in:
Adolfo Gómez García 2022-12-13 16:52:33 +01:00
parent 57d2c40947
commit 5c8afb4d0d
No known key found for this signature in database
GPG Key ID: DD1ABF20724CDA23
5 changed files with 138 additions and 7 deletions

View File

@ -34,6 +34,7 @@ import logging
import typing
from uds.core.util import permissions
from uds.core.util.rest.tools import match
from uds import models
@ -129,13 +130,39 @@ class Permissions(Handler):
la = len(self._args)
perm = permissions.PermissionType.from_str(self._params.get('perm', '0'))
def add_user_permission(cls_param: str, obj_param: str, user_param: str) -> typing.List[typing.Dict]:
cls = Permissions.getClass(cls_param)
obj = cls.objects.get(uuid=obj_param)
user = models.User.objects.get(uuid=user_param)
permissions.addUserPermission(user, obj, perm)
return Permissions.permsToDict(permissions.getPermissions(obj))
def add_group_permission(cls_param: str, obj_param: str, group_param: str) -> typing.List[typing.Dict]:
cls = Permissions.getClass(cls_param)
obj = cls.objects.get(uuid=obj_param)
group = models.Group.objects.get(uuid=group_param)
permissions.addGroupPermission(group, obj, perm)
return Permissions.permsToDict(permissions.getPermissions(obj))
def revoke() -> typing.List[typing.Dict]:
for permId in self._params.get('items', []):
permissions.revokePermissionById(permId)
return []
def no_match() -> None:
raise RequestError('Invalid request')
# match is a helper function that will match the args with the given patterns
return match(self._args,
no_match,
(('<cls>', '<obl>', 'users', 'add', '<user>'), add_user_permission),
(('<cls>', '<obl>', 'groups', 'add', '<group>'), add_group_permission),
(('revoke', ), revoke)
)
if la == 5 and self._args[3] == 'add':
perm: permissions.PermissionType = {
'0': permissions.PermissionType.NONE,
'1': permissions.PermissionType.READ,
'2': permissions.PermissionType.MANAGEMENT,
'3': permissions.PermissionType.ALL,
}.get(self._params.get('perm', '0'), permissions.PermissionType.NONE)
cls = Permissions.getClass(self._args[0])

View File

@ -0,0 +1,88 @@
# -*- coding: utf-8 -*-
#
# Copyright (c) 2022 Virtual Cable S.L.U.
# All rights reserved.
#
# Redistribution and use in source and binary forms, with or without modification,
# are permitted provided that the following conditions are met:
#
# * Redistributions of source code must retain the above copyright notice,
# this list of conditions and the following disclaimer.
# * Redistributions in binary form must reproduce the above copyright notice,
# this list of conditions and the following disclaimer in the documentation
# and/or other materials provided with the distribution.
# * Neither the name of Virtual Cable S.L.U. nor the names of its contributors
# may be used to endorse or promote products derived from this software
# without specific prior written permission.
#
# THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS IS"
# AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE
# IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE ARE
# DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT HOLDER OR CONTRIBUTORS BE LIABLE
# FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR CONSEQUENTIAL
# DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR
# SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER
# CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY,
# OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE
# OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
"""
Author: Adolfo Gómez, dkmaster at dkmon dot com
"""
import typing
import logging
logger = logging.getLogger(__name__)
T = typing.TypeVar('T', bound=typing.Any)
# We want to write something like this:
# (('<arg>', '<arg2>', 'literal', '<other_arg>', '<other_arg2>', 'literal2', ...), callback)
# Where callback is a function that will be called with the arguments in the order they are
# in the tuple, and the literals will be ignored
# So, for example, if we have a tuple like this:
# ('<sample>', '<arg_2>', 'literal', 'other_literal', '<argument>', 'literal2')
# The callback will be called with the arguments in the order they are in the tuple, so:
# callback(sample, arg_2, argument)
# And the literals will be ignored
def match(
arg_list: typing.Tuple[str, ...],
error: typing.Callable[..., typing.Any],
*args: typing.Tuple[typing.Tuple[str, ...], typing.Callable[..., T]],
) -> typing.Any:
"""
Matches a list of arguments against a list of matchers.
The matchers are a list of tuples, where the first element is a tuple of strings
that will be used to match the arguments, and the second element is a function
that will be called with the arguments in the order they are in the tuple, and the
literals will be ignored
So, for example, if we have a tuple like this:
('<sample>', '<arg_2>', 'literal', 'other_literal', '<argument>', 'literal2')
The callback will be called with the arguments in the order they are in the tuple, so:
callback(sample, arg_2, argument)
And the literals will be ignored
"""
for matcher in args:
if len(arg_list) != len(matcher[0]):
continue
# Check if all the arguments match
match = True
for i, arg in enumerate(arg_list):
if matcher[0][i].startswith('<') and matcher[0][i].endswith('>'):
continue
if arg != matcher[0][i]:
match = False
break
if match:
# All the arguments match, call the callback
return matcher[1](*[arg for i, arg in enumerate(arg_list) if matcher[0][i].startswith('<') and matcher[0][i].endswith('>')])
logger.warning('No match found for %s with %s', arg_list, args)
# Invoke error callback
error()

View File

@ -1,7 +1,7 @@
# -*- coding: utf-8 -*-
#
# Copyright (c) 2012-2021 Virtual Cable S.L.U.
# Copyright (c) 2012-2022 Virtual Cable S.L.U.
# All rights reserved.
#
# Redistribution and use in source and binary forms, with or without modification,

View File

@ -65,11 +65,27 @@ class PermissionType(enum.IntEnum):
PermissionType.ALL: _('All'),
}.get(self, _('None'))
@staticmethod
def from_str(value: str) -> 'PermissionType':
"""Returns the permission from a string"""
value = value.lower()
if value in ('0', 'none'):
return PermissionType.NONE
if value in ('1', 'read'):
return PermissionType.READ
if value in ('2', 'manage', 'management'):
return PermissionType.MANAGEMENT
if value in ('3', 'all', 'rw', 'readwrite', 'read/write'):
return PermissionType.ALL
# Unknown value, return NONE
return PermissionType.NONE
def includes(self, permission: 'PermissionType') -> bool:
"""Returns if the permission includes the given permission"""
return self.value >= permission.value
class Permissions(UUIDModel):
"""
An OS Manager represents a manager for responding requests for agents inside services.