mirror of
https://github.com/dkmstr/openuds.git
synced 2025-03-20 06:50:23 +03:00
Refactor access control roles to use UserRole constants for consistency
This commit is contained in:
parent
ee2262a779
commit
b41a1afd43
@ -29,17 +29,22 @@
|
||||
"""
|
||||
Author: Adolfo Gómez, dkmaster at dkmon dot com
|
||||
"""
|
||||
import dataclasses
|
||||
import logging
|
||||
import typing
|
||||
|
||||
from django import http
|
||||
from django.shortcuts import render
|
||||
from django.views.generic.base import View
|
||||
|
||||
from uds.core.auths import auth
|
||||
from uds.core import consts, types
|
||||
|
||||
from .dispatcher import Dispatcher
|
||||
|
||||
# Not imported at runtime, just for type checking
|
||||
if typing.TYPE_CHECKING:
|
||||
pass
|
||||
from uds.core import types
|
||||
|
||||
logger = logging.getLogger(__name__)
|
||||
|
||||
@ -49,6 +54,41 @@ class Documentation(View):
|
||||
def dispatch(
|
||||
self, request: 'http.request.HttpRequest', *_args: typing.Any, **kwargs: typing.Any
|
||||
) -> 'http.HttpResponse':
|
||||
request = typing.cast('types.requests.ExtendedHttpRequest', request)
|
||||
if not request.user or not request.authorized:
|
||||
return auth.weblogout(request)
|
||||
|
||||
if not request.user.get_role().can_access(consts.UserRole.STAFF):
|
||||
return auth.weblogout(request)
|
||||
|
||||
@dataclasses.dataclass
|
||||
class HelpInfo:
|
||||
level: int
|
||||
path: str
|
||||
text: str
|
||||
|
||||
help_data: list[HelpInfo] = []
|
||||
|
||||
def _process_node(node: 'types.rest.HelpNode', path: str, level: int) -> None:
|
||||
help_data.append(HelpInfo(level, path, node.help.text))
|
||||
|
||||
for child in node.children:
|
||||
_process_node(
|
||||
child,
|
||||
path + '/' + child.help.path,
|
||||
level + (0 if node.kind == types.rest.HelpNode.HelpNodeType.PATH else 1),
|
||||
)
|
||||
|
||||
_process_node(Dispatcher.base_handler_node.help_node(), '', 0)
|
||||
|
||||
response = render(
|
||||
request=request,
|
||||
template_name='uds/modern/documentation.html',
|
||||
context={'help': help_data},
|
||||
)
|
||||
|
||||
return response
|
||||
|
||||
service = Dispatcher.base_handler_node
|
||||
|
||||
return http.HttpResponseServerError(f'{service.tree()}', content_type="text/plain")
|
||||
|
||||
# return http.HttpResponseServerError(f'{service.tree()}', content_type="text/plain")
|
||||
|
@ -52,6 +52,7 @@ if typing.TYPE_CHECKING:
|
||||
|
||||
logger = logging.getLogger(__name__)
|
||||
|
||||
|
||||
class Handler:
|
||||
"""
|
||||
REST requests handler base class
|
||||
@ -63,13 +64,10 @@ class Handler:
|
||||
path: typing.ClassVar[typing.Optional[str]] = (
|
||||
None # Path for this method, so we can do /auth/login, /auth/logout, /auth/auths in a simple way
|
||||
)
|
||||
authenticated: typing.ClassVar[bool] = (
|
||||
True # By default, all handlers needs authentication. Will be overwriten if needs_admin or needs_staff,
|
||||
)
|
||||
needs_admin: typing.ClassVar[bool] = (
|
||||
False # By default, the methods will be accessible by anyone if nothing else indicated
|
||||
)
|
||||
needs_staff: typing.ClassVar[bool] = False # By default, staff
|
||||
|
||||
min_access_role: typing.ClassVar[consts.UserRole] = (
|
||||
consts.UserRole.USER
|
||||
) # By default, only users can access
|
||||
|
||||
# For implementing help
|
||||
# A list of pairs of (path, help) for subpaths on this handler
|
||||
@ -85,7 +83,9 @@ class Handler:
|
||||
# These are the "path" split by /, that is, the REST invocation arguments
|
||||
_args: list[str]
|
||||
_kwargs: dict[str, typing.Any] # This are the "path" split by /, that is, the REST invocation arguments
|
||||
_headers: dict[str, str] # Note: These are "output" headers, not input headers (input headers can be retrieved from request)
|
||||
_headers: dict[
|
||||
str, str
|
||||
] # Note: These are "output" headers, not input headers (input headers can be retrieved from request)
|
||||
_session: typing.Optional[SessionStore]
|
||||
_auth_token: typing.Optional[str]
|
||||
_user: 'User'
|
||||
@ -103,14 +103,6 @@ class Handler:
|
||||
*args: str,
|
||||
**kwargs: typing.Any,
|
||||
):
|
||||
logger.debug('Data: %s %s %s', self.__class__, self.needs_admin, self.authenticated)
|
||||
if (
|
||||
self.needs_admin or self.needs_staff
|
||||
) and not self.authenticated: # If needs_admin, must also be authenticated
|
||||
raise Exception(
|
||||
f'class {self.__class__} is not authenticated but has needs_admin or needs_staff set!!'
|
||||
)
|
||||
|
||||
self._request = request
|
||||
self._path = path
|
||||
self._operation = method
|
||||
@ -119,7 +111,8 @@ class Handler:
|
||||
self._kwargs = kwargs
|
||||
self._headers = {}
|
||||
self._auth_token = None
|
||||
if self.authenticated: # Only retrieve auth related data on authenticated handlers
|
||||
|
||||
if self.min_access_role.needs_authentication:
|
||||
try:
|
||||
self._auth_token = self._request.headers.get(consts.auth.AUTH_TOKEN_HEADER, '')
|
||||
self._session = SessionStore(session_key=self._auth_token)
|
||||
@ -132,11 +125,10 @@ class Handler:
|
||||
if self._auth_token is None:
|
||||
raise AccessDenied()
|
||||
|
||||
if self.needs_admin and not self.is_admin():
|
||||
self._user = self.get_user()
|
||||
if not self._user.can_access(self.min_access_role):
|
||||
raise AccessDenied()
|
||||
|
||||
if self.needs_staff and not self.is_staff_member():
|
||||
raise AccessDenied()
|
||||
try:
|
||||
self._user = self.get_user()
|
||||
except Exception as e:
|
||||
@ -158,10 +150,10 @@ class Handler:
|
||||
def header(self, header_name: str) -> typing.Optional[str]:
|
||||
"""
|
||||
Get's an specific header name from REST request
|
||||
|
||||
|
||||
Args:
|
||||
header_name: Name of header to retrieve
|
||||
|
||||
|
||||
Returns:
|
||||
Value of header or None if not found
|
||||
"""
|
||||
|
@ -89,12 +89,41 @@ class Accounts(ModelHandler):
|
||||
return self.add_default_fields([], ['name', 'comments', 'tags'])
|
||||
|
||||
def timemark(self, item: 'Model') -> typing.Any:
|
||||
"""
|
||||
API:
|
||||
Description:
|
||||
Generates a time mark associated with the account.
|
||||
This is useful to easily identify when the account data was last updated.
|
||||
(For example, one user enters an service, we get the usage data and "timemark" it, later read again
|
||||
and we can identify that all data before this timemark has already been processed)
|
||||
|
||||
Parameters:
|
||||
- item: Account
|
||||
Description of the item parameter
|
||||
|
||||
Response:
|
||||
200: All fine, no data returned
|
||||
"""
|
||||
item = ensure.is_instance(item, Account)
|
||||
item.time_mark = datetime.datetime.now()
|
||||
item.save()
|
||||
return ''
|
||||
|
||||
def clear(self, item: 'Model') -> typing.Any:
|
||||
"""
|
||||
Api documentation for the method. From here, will be used by the documentation generator
|
||||
Always starts with API:
|
||||
API:
|
||||
Description:
|
||||
Clears all usage associated with the account
|
||||
|
||||
Parameters:
|
||||
- item: Account
|
||||
Description of the item parameter
|
||||
|
||||
Response:
|
||||
200: All fine, no data returned
|
||||
"""
|
||||
item = ensure.is_instance(item, Account)
|
||||
self.ensure_has_access(item, uds.core.types.permissions.PermissionType.MANAGEMENT)
|
||||
return item.usages.filter(user_service=None).delete()
|
||||
|
@ -145,7 +145,7 @@ def clear_failed_ip_counter(request: 'ExtendedHttpRequest') -> None:
|
||||
|
||||
|
||||
class ActorV3Action(Handler):
|
||||
authenticated = False # Actor requests are not authenticated normally
|
||||
min_access_role = consts.UserRole.ANONYMOUS
|
||||
path = 'actor/v3'
|
||||
|
||||
@staticmethod
|
||||
@ -291,8 +291,7 @@ class Register(ActorV3Action):
|
||||
|
||||
"""
|
||||
|
||||
authenticated = True
|
||||
needs_staff = True
|
||||
min_access_role = consts.UserRole.STAFF
|
||||
|
||||
name = 'register'
|
||||
|
||||
|
@ -35,7 +35,7 @@ import typing
|
||||
|
||||
from django.core.cache import caches
|
||||
|
||||
from uds.core import exceptions
|
||||
from uds.core import exceptions, consts
|
||||
from uds.core.util.cache import Cache as UCache
|
||||
from uds.REST import Handler
|
||||
|
||||
@ -44,8 +44,7 @@ logger = logging.getLogger(__name__)
|
||||
|
||||
# Enclosed methods under /cache path
|
||||
class Cache(Handler):
|
||||
authenticated = True
|
||||
needs_admin = True
|
||||
min_access_role = consts.UserRole.ADMIN
|
||||
|
||||
def get(self) -> typing.Any:
|
||||
"""
|
||||
|
@ -58,7 +58,7 @@ class Client(Handler):
|
||||
Processes Client requests
|
||||
"""
|
||||
|
||||
authenticated = False # Client requests are not authenticated
|
||||
min_access_role = consts.UserRole.ANONYMOUS
|
||||
|
||||
@staticmethod
|
||||
def result(
|
||||
|
@ -33,6 +33,7 @@ Author: Adolfo Gómez, dkmaster at dkmon dot com
|
||||
import typing
|
||||
import logging
|
||||
|
||||
from uds.core import consts
|
||||
from uds.core.util.config import Config as CfgConfig
|
||||
from uds.REST import Handler
|
||||
|
||||
@ -42,7 +43,7 @@ logger = logging.getLogger(__name__)
|
||||
|
||||
# Enclosed methods under /config path
|
||||
class Config(Handler):
|
||||
needs_admin = True # By default, staff is lower level needed
|
||||
min_access_role = consts.UserRole.ADMIN
|
||||
|
||||
def get(self) -> typing.Any:
|
||||
return CfgConfig.get_config_values(self.is_admin())
|
||||
|
@ -34,7 +34,7 @@ import datetime
|
||||
import logging
|
||||
import typing
|
||||
|
||||
from uds.core import exceptions, types
|
||||
from uds.core import exceptions, types, consts
|
||||
from uds.core.managers.crypto import CryptoManager
|
||||
from uds.core.managers.userservice import UserServiceManager
|
||||
from uds.core.exceptions.services import ServiceNotReadyError
|
||||
@ -51,9 +51,7 @@ class Connection(Handler):
|
||||
Processes actor requests
|
||||
"""
|
||||
|
||||
authenticated = True # Actor requests are not authenticated
|
||||
needs_admin = False
|
||||
needs_staff = False
|
||||
min_access_role = consts.UserRole.USER
|
||||
|
||||
@staticmethod
|
||||
def result(
|
||||
|
@ -32,7 +32,7 @@ Author: Adolfo Gómez, dkmaster at dkmon dot com
|
||||
"""
|
||||
import logging
|
||||
|
||||
from uds.core import exceptions, types
|
||||
from uds.core import exceptions, types, consts
|
||||
from uds.core.ui import gui
|
||||
from uds.REST import Handler
|
||||
|
||||
@ -43,8 +43,8 @@ logger = logging.getLogger(__name__)
|
||||
|
||||
class Callback(Handler):
|
||||
path = 'gui'
|
||||
authenticated = True
|
||||
needs_staff = True
|
||||
|
||||
min_access_role = consts.UserRole.STAFF
|
||||
|
||||
def get(self) -> types.ui.CallbackResultType:
|
||||
if len(self._args) != 1:
|
||||
|
@ -56,7 +56,7 @@ class Login(Handler):
|
||||
"""
|
||||
|
||||
path = 'auth'
|
||||
authenticated = False # Public method
|
||||
min_access_role = consts.UserRole.ANONYMOUS
|
||||
|
||||
@staticmethod
|
||||
def result(
|
||||
@ -221,7 +221,7 @@ class Logout(Handler):
|
||||
|
||||
class Auths(Handler):
|
||||
path = 'auth'
|
||||
authenticated = False # By default, all handlers needs authentication
|
||||
min_access_role = consts.UserRole.ANONYMOUS
|
||||
|
||||
def auths(self) -> collections.abc.Iterable[dict[str, typing.Any]]:
|
||||
all_param: bool = self._params.get('all', 'false').lower() == 'true'
|
||||
|
@ -109,8 +109,8 @@ class MetaPools(ModelHandler):
|
||||
]
|
||||
|
||||
custom_methods = [
|
||||
types.rest.ModelCustomMethod('setFallbackAccess', True),
|
||||
types.rest.ModelCustomMethod('getFallbackAccess', True),
|
||||
types.rest.ModelCustomMethod('set_fallback_access', True),
|
||||
types.rest.ModelCustomMethod('get_fallback_access', True),
|
||||
]
|
||||
|
||||
def item_as_dict(self, item: 'Model') -> dict[str, typing.Any]:
|
||||
|
@ -36,7 +36,7 @@ import typing
|
||||
|
||||
import uds.core.types.permissions
|
||||
from uds import models
|
||||
from uds.core import exceptions
|
||||
from uds.core import consts, exceptions
|
||||
from uds.core.util import permissions
|
||||
from uds.core.util.rest.tools import match
|
||||
from uds.REST import Handler
|
||||
@ -53,8 +53,7 @@ class Permissions(Handler):
|
||||
"""
|
||||
Processes permissions requests
|
||||
"""
|
||||
|
||||
needs_admin = True
|
||||
min_access_role = consts.UserRole.ADMIN
|
||||
|
||||
@staticmethod
|
||||
def get_class(class_name: str) -> type['Model']:
|
||||
|
@ -63,9 +63,8 @@ class Reports(model.BaseModelHandler):
|
||||
"""
|
||||
Processes reports requests
|
||||
"""
|
||||
|
||||
needs_admin = True # By default, staff is lower level needed
|
||||
|
||||
min_access_role = consts.UserRole.ADMIN
|
||||
|
||||
table_title = _('Available reports')
|
||||
table_fields = [
|
||||
{'group': {'title': _('Group')}},
|
||||
|
@ -138,14 +138,15 @@ class ServerRegisterBase(Handler):
|
||||
|
||||
|
||||
class ServerRegister(ServerRegisterBase):
|
||||
needs_staff = True
|
||||
min_access_role = consts.UserRole.STAFF
|
||||
|
||||
path = 'servers'
|
||||
name = 'register'
|
||||
|
||||
|
||||
# REST handlers for server actions
|
||||
class ServerTest(Handler):
|
||||
authenticated = False # Test is not authenticated, the auth is the token to test itself
|
||||
min_access_role = consts.UserRole.ANONYMOUS
|
||||
|
||||
path = 'servers'
|
||||
name = 'test'
|
||||
@ -172,7 +173,7 @@ class ServerEvent(Handler):
|
||||
* log
|
||||
"""
|
||||
|
||||
authenticated = False # Actor requests are not authenticated normally
|
||||
min_access_role = consts.UserRole.ANONYMOUS
|
||||
path = 'servers'
|
||||
name = 'event'
|
||||
|
||||
|
@ -58,8 +58,6 @@ class ServicesPoolGroups(ModelHandler):
|
||||
Handles the gallery REST interface
|
||||
"""
|
||||
|
||||
# needs_admin = True
|
||||
|
||||
path = 'gallery'
|
||||
model = ServicePoolGroup
|
||||
save_fields = ['name', 'comments', 'image_id', 'priority']
|
||||
|
@ -34,8 +34,7 @@ import logging
|
||||
import datetime
|
||||
import typing
|
||||
|
||||
from uds.core.types.rest import HelpPath
|
||||
from uds.core import types
|
||||
from uds.core import types, consts
|
||||
from uds.REST import Handler
|
||||
from uds import models
|
||||
from uds.core.util.stats import counters
|
||||
@ -45,11 +44,10 @@ logger = logging.getLogger(__name__)
|
||||
|
||||
# Enclosed methods under /cache path
|
||||
class Stats(Handler):
|
||||
authenticated = True
|
||||
needs_admin = True
|
||||
min_access_role = consts.UserRole.ADMIN
|
||||
|
||||
help_paths = [
|
||||
HelpPath('', 'Returns the last day usage statistics for all authenticators'),
|
||||
types.rest.HelpPath('', 'Returns the last day usage statistics for all authenticators'),
|
||||
]
|
||||
help_text = 'Provides access to usage statistics'
|
||||
|
||||
|
@ -38,8 +38,7 @@ import pickletools
|
||||
import typing
|
||||
|
||||
from uds import models
|
||||
from uds.core.types.rest import HelpPath
|
||||
from uds.core import exceptions, types
|
||||
from uds.core import exceptions, types, consts
|
||||
from uds.core.util import permissions
|
||||
from uds.core.util.cache import Cache
|
||||
from uds.core.util.model import process_uuid, sql_now
|
||||
@ -144,19 +143,18 @@ class System(Handler):
|
||||
}
|
||||
"""
|
||||
|
||||
needs_admin = False
|
||||
needs_staff = True
|
||||
min_access_role = consts.UserRole.STAFF
|
||||
|
||||
help_paths = [
|
||||
HelpPath('', ''),
|
||||
HelpPath('stats/assigned', ''),
|
||||
HelpPath('stats/inuse', ''),
|
||||
HelpPath('stats/cached', ''),
|
||||
HelpPath('stats/complete', ''),
|
||||
HelpPath('stats/assigned/<servicePoolId>', ''),
|
||||
HelpPath('stats/inuse/<servicePoolId>', ''),
|
||||
HelpPath('stats/cached/<servicePoolId>', ''),
|
||||
HelpPath('stats/complete/<servicePoolId>', ''),
|
||||
types.rest.HelpPath('', ''),
|
||||
types.rest.HelpPath('stats/assigned', ''),
|
||||
types.rest.HelpPath('stats/inuse', ''),
|
||||
types.rest.HelpPath('stats/cached', ''),
|
||||
types.rest.HelpPath('stats/complete', ''),
|
||||
types.rest.HelpPath('stats/assigned/<servicePoolId>', ''),
|
||||
types.rest.HelpPath('stats/inuse/<servicePoolId>', ''),
|
||||
types.rest.HelpPath('stats/cached/<servicePoolId>', ''),
|
||||
types.rest.HelpPath('stats/complete/<servicePoolId>', ''),
|
||||
]
|
||||
help_text = 'Provides system information. Must be admin to access this'
|
||||
|
||||
|
@ -40,7 +40,7 @@ from uds import models
|
||||
from uds.core.managers.crypto import CryptoManager
|
||||
from uds.core.util.model import process_uuid
|
||||
from uds.core.util import ensure
|
||||
from uds.core import exceptions
|
||||
from uds.core import consts, exceptions
|
||||
|
||||
logger = logging.getLogger(__name__)
|
||||
|
||||
@ -89,7 +89,7 @@ class Tickets(Handler):
|
||||
- servicePool has these groups in it's allowed list
|
||||
"""
|
||||
|
||||
needs_admin = True # By default, staff is lower level needed
|
||||
min_access_role = consts.UserRole.ADMIN
|
||||
|
||||
@staticmethod
|
||||
def result(result: str = '', error: typing.Optional[str] = None) -> dict[str, typing.Any]:
|
||||
|
@ -34,7 +34,7 @@ import logging
|
||||
import typing
|
||||
|
||||
from uds import models
|
||||
from uds.core import exceptions, types
|
||||
from uds.core import consts, exceptions, types
|
||||
from uds.core.auths.auth import is_trusted_source
|
||||
from uds.core.util import log, net
|
||||
from uds.core.util.model import sql_stamp_seconds
|
||||
@ -54,7 +54,7 @@ class TunnelTicket(Handler):
|
||||
Processes tunnel requests
|
||||
"""
|
||||
|
||||
authenticated = False # Client requests are not authenticated
|
||||
min_access_role = consts.UserRole.ANONYMOUS
|
||||
path = 'tunnel'
|
||||
name = 'ticket'
|
||||
|
||||
@ -148,7 +148,8 @@ class TunnelTicket(Handler):
|
||||
|
||||
|
||||
class TunnelRegister(ServerRegisterBase):
|
||||
needs_admin = True
|
||||
min_access_role = consts.UserRole.ADMIN
|
||||
|
||||
path = 'tunnel'
|
||||
name = 'register'
|
||||
|
||||
|
@ -40,7 +40,7 @@ logger = logging.getLogger(__name__)
|
||||
|
||||
|
||||
class UDSVersion(Handler):
|
||||
authenticated = False # Version requests are public
|
||||
min_access_role = consts.UserRole.ANONYMOUS
|
||||
name = 'version'
|
||||
|
||||
def get(self) -> collections.abc.MutableMapping[str, typing.Any]:
|
||||
|
@ -72,8 +72,7 @@ class ModelHandler(BaseModelHandler):
|
||||
"""
|
||||
|
||||
# Authentication related
|
||||
authenticated = True
|
||||
needs_staff = True
|
||||
min_access_role = consts.UserRole.STAFF
|
||||
|
||||
# Which model does this manage, must be a django model ofc
|
||||
model: 'typing.ClassVar[type[models.Model]]'
|
||||
@ -297,8 +296,8 @@ class ModelHandler(BaseModelHandler):
|
||||
# if has custom methods, look for if this request matches any of them
|
||||
for cm in self.custom_methods:
|
||||
# Convert to snake case
|
||||
camel_case_name, snake_case_name = camel_and_snake_case_from(cm[0])
|
||||
if number_of_args > 1 and cm[1] is True: # Method needs parent (existing item)
|
||||
camel_case_name, snake_case_name = camel_and_snake_case_from(cm.name)
|
||||
if number_of_args > 1 and cm.needs_parent: # Method needs parent (existing item)
|
||||
if self._args[1] in (camel_case_name, snake_case_name):
|
||||
item = None
|
||||
# Check if operation method exists
|
||||
|
@ -45,7 +45,7 @@ if typing.TYPE_CHECKING:
|
||||
from django.http import HttpRequest
|
||||
|
||||
|
||||
@weblogin_required(role=consts.Roles.ADMIN)
|
||||
@weblogin_required(role=consts.UserRole.ADMIN)
|
||||
def index(request: 'HttpRequest') -> HttpResponse:
|
||||
# Gets csrf token
|
||||
csrf_token = csrf.get_token(request)
|
||||
|
@ -117,7 +117,7 @@ def root_user() -> models.User:
|
||||
|
||||
# Decorator to make easier protect pages that needs to be logged in
|
||||
def weblogin_required(
|
||||
role: typing.Optional[consts.Roles] = None,
|
||||
role: typing.Optional[consts.UserRole] = None,
|
||||
) -> collections.abc.Callable[
|
||||
[collections.abc.Callable[..., HttpResponse]], collections.abc.Callable[..., HttpResponse]
|
||||
]:
|
||||
@ -149,8 +149,8 @@ def weblogin_required(
|
||||
if not request.user or not request.authorized:
|
||||
return weblogout(request)
|
||||
|
||||
if role in (consts.Roles.ADMIN, consts.Roles.STAFF):
|
||||
if request.user.is_staff() is False or (role == consts.Roles.ADMIN and not request.user.is_admin):
|
||||
if role in (consts.UserRole.ADMIN, consts.UserRole.STAFF):
|
||||
if request.user.is_staff() is False or (role == consts.UserRole.ADMIN and not request.user.is_admin):
|
||||
return HttpResponseForbidden(_('Forbidden'))
|
||||
|
||||
return view_func(request, *args, **kwargs)
|
||||
|
@ -77,14 +77,45 @@ UNLIMITED: typing.Final[int] = -1
|
||||
NO_MORE_NAMES: typing.Final[str] = 'NO-NAME-ERROR'
|
||||
|
||||
|
||||
class Roles(enum.StrEnum):
|
||||
class UserRole(enum.StrEnum):
|
||||
"""
|
||||
Roles for users
|
||||
"""
|
||||
|
||||
ADMIN = 'admin'
|
||||
STAFF = 'staff'
|
||||
|
||||
|
||||
# Currently not used, but reserved
|
||||
USER = 'user'
|
||||
ANONYMOUS = 'anonymous'
|
||||
|
||||
@property
|
||||
def needs_authentication(self) -> bool:
|
||||
"""
|
||||
Checks if this role needs authentication
|
||||
|
||||
Returns:
|
||||
True if this role needs authentication, False otherwise
|
||||
"""
|
||||
return self != UserRole.ANONYMOUS
|
||||
|
||||
def can_access(self, role: 'UserRole') -> bool:
|
||||
"""
|
||||
Checks if this role can access to the requested role
|
||||
|
||||
That is, if this role is greater or equal to the requested role
|
||||
|
||||
Args:
|
||||
role: Role to check against
|
||||
|
||||
Returns:
|
||||
True if this role can access to the requested role, False otherwise
|
||||
"""
|
||||
ROLE_PRECEDENCE: typing.Final = {
|
||||
UserRole.ADMIN: 3,
|
||||
UserRole.STAFF: 2,
|
||||
UserRole.USER: 1,
|
||||
UserRole.ANONYMOUS: 0,
|
||||
}
|
||||
|
||||
return ROLE_PRECEDENCE[self] >= ROLE_PRECEDENCE[role]
|
||||
|
@ -30,6 +30,8 @@
|
||||
Author: Adolfo Gómez, dkmaster at dkmon dot com
|
||||
"""
|
||||
import abc
|
||||
import enum
|
||||
import re
|
||||
import typing
|
||||
import dataclasses
|
||||
import collections.abc
|
||||
@ -96,7 +98,8 @@ class TypeInfo:
|
||||
|
||||
# This is a named tuple for convenience, and must be
|
||||
# compatible with tuple[str, bool] (name, needs_parent)
|
||||
class ModelCustomMethod(typing.NamedTuple):
|
||||
@dataclasses.dataclass
|
||||
class ModelCustomMethod:
|
||||
name: str
|
||||
needs_parent: bool = True
|
||||
|
||||
@ -109,17 +112,80 @@ ItemGeneratorType = typing.Generator[ItemDictType, None, None]
|
||||
# Alias for get_items return type
|
||||
ManyItemsDictType = typing.Union[ItemListType, ItemDictType, ItemGeneratorType]
|
||||
|
||||
#
|
||||
#
|
||||
FieldType = collections.abc.Mapping[str, typing.Any]
|
||||
|
||||
# Regular expression to match the API: part of the docstring
|
||||
# should be a multi line string, with a line containing only "API:" (with leading and trailing \s)
|
||||
API_RE = re.compile(r'(?ms)^\s*API:\s*$')
|
||||
|
||||
class HelpPath(typing.NamedTuple):
|
||||
|
||||
@dataclasses.dataclass(eq=False)
|
||||
class HelpPath:
|
||||
"""
|
||||
Help helper class
|
||||
"""
|
||||
|
||||
path: str
|
||||
help: str
|
||||
text: str
|
||||
|
||||
def __init__(self, path: str, help: str):
|
||||
self.path = path
|
||||
self.text = HelpPath.process_help(help)
|
||||
|
||||
def __hash__(self) -> int:
|
||||
return hash(self.path)
|
||||
|
||||
def __eq__(self, other: object) -> bool:
|
||||
if not isinstance(other, HelpPath):
|
||||
return False
|
||||
return self.path == other.path
|
||||
|
||||
@property
|
||||
def is_empty(self) -> bool:
|
||||
return self.path == '' and self.text == ''
|
||||
|
||||
@staticmethod
|
||||
def process_help(help: str) -> str:
|
||||
"""
|
||||
Processes the help string, removing leading and trailing spaces
|
||||
"""
|
||||
match = API_RE.search(help)
|
||||
if match:
|
||||
return help[match.end() :].strip()
|
||||
|
||||
return ''
|
||||
|
||||
|
||||
@dataclasses.dataclass(frozen=True)
|
||||
class HelpNode:
|
||||
class HelpNodeType(enum.StrEnum):
|
||||
MODEL = 'model'
|
||||
DETAIL = 'detail'
|
||||
CUSTOM = 'custom'
|
||||
PATH = 'path'
|
||||
|
||||
help: HelpPath
|
||||
children: list['HelpNode'] # Children nodes
|
||||
kind: HelpNodeType
|
||||
|
||||
def __hash__(self) -> int:
|
||||
return hash(self.help.path)
|
||||
|
||||
def __eq__(self, other: object) -> bool:
|
||||
if isinstance(other, HelpNode):
|
||||
return self.help.path == other.help.path
|
||||
if not isinstance(other, HelpPath):
|
||||
return False
|
||||
|
||||
return self.help.path == other.path
|
||||
|
||||
@property
|
||||
def is_empty(self) -> bool:
|
||||
return self.help.is_empty and not self.children
|
||||
|
||||
def __str__(self) -> str:
|
||||
return f'HelpNode({self.help}, {self.children})'
|
||||
|
||||
|
||||
@dataclasses.dataclass(frozen=True)
|
||||
@ -144,7 +210,7 @@ class HandlerNode:
|
||||
Returns a string representation of the tree
|
||||
"""
|
||||
from uds.REST.model import ModelHandler
|
||||
|
||||
|
||||
if self.handler is None:
|
||||
return f'{" " * level}|- {self.name}\n' + ''.join(
|
||||
child.tree(level + 1) for child in self.children.values()
|
||||
@ -163,13 +229,66 @@ class HandlerNode:
|
||||
|
||||
return ret + ''.join(child.tree(level + 1) for child in self.children.values())
|
||||
|
||||
def help_node(self) -> HelpNode:
|
||||
"""
|
||||
Returns a HelpNode for this node (and children recursively)
|
||||
"""
|
||||
from uds.REST.model import ModelHandler
|
||||
|
||||
custom_help: set[HelpNode] = set()
|
||||
|
||||
help_node_type = HelpNode.HelpNodeType.PATH
|
||||
|
||||
if self.handler:
|
||||
help_node_type = HelpNode.HelpNodeType.CUSTOM
|
||||
if issubclass(self.handler, ModelHandler):
|
||||
help_node_type = HelpNode.HelpNodeType.MODEL
|
||||
# Add custom_methods
|
||||
for method in self.handler.custom_methods:
|
||||
# Method is a Me CustomModelMethod,
|
||||
# We access the __doc__ of the function inside the handler with method.name
|
||||
doc = getattr(self.handler, method.name).__doc__ or ''
|
||||
custom_help.add(
|
||||
HelpNode(
|
||||
HelpPath(path=self.full_path() + '/' + method.name, help=doc),
|
||||
[],
|
||||
HelpNode.HelpNodeType.CUSTOM,
|
||||
)
|
||||
)
|
||||
|
||||
# Add detail methods
|
||||
if self.handler.detail:
|
||||
for method in self.handler.detail.keys():
|
||||
custom_help.add(
|
||||
HelpNode(
|
||||
HelpPath(path=self.full_path() + '/' + method, help=''),
|
||||
[],
|
||||
HelpNode.HelpNodeType.DETAIL,
|
||||
)
|
||||
)
|
||||
|
||||
custom_help |= {
|
||||
HelpNode(HelpPath(path=help_info.path, help=help_info.text), [], help_node_type)
|
||||
for help_info in self.handler.help_paths
|
||||
}
|
||||
|
||||
custom_help |= {child.help_node() for child in self.children.values()}
|
||||
|
||||
return HelpNode(
|
||||
help=HelpPath(path=self.full_path(), help=self.handler.__doc__ or ''),
|
||||
children=list(custom_help),
|
||||
kind=help_node_type,
|
||||
)
|
||||
|
||||
def find_path(self, path: str | list[str]) -> typing.Optional['HandlerNode']:
|
||||
"""
|
||||
Returns the node for a given path, or None if not found
|
||||
"""
|
||||
if not path or not self.children:
|
||||
return self
|
||||
path = path.split('/') if isinstance(path, str) else path
|
||||
|
||||
# Remove any trailing '/' to allow some "bogus" paths with trailing slashes
|
||||
path = path.lstrip('/').split('/') if isinstance(path, str) else path
|
||||
|
||||
if path[0] not in self.children:
|
||||
return None
|
||||
@ -188,4 +307,4 @@ class HandlerNode:
|
||||
if parent_full_path == '':
|
||||
return self.name
|
||||
|
||||
return f'{parent_full_path}/{self.name}'
|
||||
return f'{parent_full_path}/{self.name}'
|
||||
|
@ -35,7 +35,7 @@ import typing
|
||||
|
||||
from django.db import models
|
||||
from django.db.models import Count, Q, signals
|
||||
from uds.core import auths, mfas, types
|
||||
from uds.core import auths, mfas, types, consts
|
||||
from uds.core.util import log, storage, properties
|
||||
|
||||
from .authenticator import Authenticator
|
||||
@ -46,7 +46,6 @@ from .uuid_model import UUIDModel
|
||||
# Not imported at runtime, just for type checking
|
||||
if typing.TYPE_CHECKING:
|
||||
from uds.models import Group, UserService, Permissions
|
||||
from uds.core.types.requests import ExtendedHttpRequest
|
||||
from django.db.models.manager import RelatedManager
|
||||
|
||||
|
||||
@ -133,6 +132,26 @@ class User(UUIDModel, properties.PropertiesMixin):
|
||||
"""
|
||||
return self.staff_member or self.is_admin
|
||||
|
||||
def get_role(self) -> consts.UserRole:
|
||||
"""
|
||||
Returns the role of the user
|
||||
"""
|
||||
if self.pk is None:
|
||||
return consts.UserRole.ANONYMOUS
|
||||
|
||||
if self.is_admin:
|
||||
return consts.UserRole.ADMIN
|
||||
if self.staff_member:
|
||||
return consts.UserRole.STAFF
|
||||
|
||||
return consts.UserRole.USER
|
||||
|
||||
def can_access(self, role: consts.UserRole) -> bool:
|
||||
"""
|
||||
Returns true if the user has more or equal role than the one passed as argument
|
||||
"""
|
||||
return self.get_role().can_access(role)
|
||||
|
||||
def update_last_access(self) -> None:
|
||||
"""
|
||||
Updates the last access for this user with the current time of the sql server
|
||||
@ -140,7 +159,7 @@ class User(UUIDModel, properties.PropertiesMixin):
|
||||
self.last_access = sql_now()
|
||||
self.save(update_fields=['last_access'])
|
||||
|
||||
def logout(self, request: 'ExtendedHttpRequest') -> types.auth.AuthenticationResult:
|
||||
def logout(self, request: 'types.requests.ExtendedHttpRequest') -> types.auth.AuthenticationResult:
|
||||
"""
|
||||
Invoked to log out this user
|
||||
Returns the url where to redirect user, or None if default url will be used
|
||||
|
@ -11,7 +11,7 @@
|
||||
}
|
||||
</script>
|
||||
<script type="text/javascript" src="{% url 'utility.jsCatalog' LANGUAGE_CODE %}"></script>
|
||||
<script type="text/javascript" src="{% url 'utility-adm.js' %}"></script>
|
||||
<script type="text/javascript" src="{% url 'utility.js' %}"></script>
|
||||
|
||||
<script type="text/javascript">
|
||||
function onLoad() {
|
||||
|
39
server/src/uds/templates/uds/modern/documentation.html
Normal file
39
server/src/uds/templates/uds/modern/documentation.html
Normal file
@ -0,0 +1,39 @@
|
||||
{% load i18n %}{% get_current_language as LANGUAGE_CODE %}<!DOCTYPE html>
|
||||
<html lang="en">
|
||||
<head>
|
||||
<meta charset="utf-8" />
|
||||
<meta name="viewport" content="width=device-width, initial-scale=1" />
|
||||
<link rel="icon" type="image/png" href="/uds/res/modern/img/favicon.png" />
|
||||
|
||||
<meta http-equiv="X-UA-Compatible" content="IE=edge" />
|
||||
|
||||
<title data-i18n="uds:documentation">Documentation</title>
|
||||
|
||||
<link href="/uds/res/modern/fonts/material-icons.css" rel="stylesheet" />
|
||||
<link href="/uds/res/modern/fonts/roboto.css" rel="stylesheet" />
|
||||
<!-- Material styles -->
|
||||
<link href="/uds/res/modern/styles.css" rel="stylesheet" />
|
||||
</head>
|
||||
<body>
|
||||
<div>
|
||||
<div class="header">
|
||||
<div class="header-logo">
|
||||
<a href="/"><img src="/uds/res/modern/img/logo.png" alt="UDS" /></a>
|
||||
</div>
|
||||
<div class="header-title">
|
||||
<h1 data-i18n="uds:documentation">Documentation</h1>
|
||||
</div>
|
||||
</div>
|
||||
<!-- doc contains the list of helps -->
|
||||
<div class="doc">
|
||||
{% for h in help %}
|
||||
<div class="doc-item">
|
||||
{{ h }}
|
||||
</div>
|
||||
{% endfor %}
|
||||
|
||||
</div>
|
||||
|
||||
</div>
|
||||
</body>
|
||||
</html>
|
@ -46,7 +46,7 @@ if typing.TYPE_CHECKING:
|
||||
logger = logging.getLogger(__name__)
|
||||
|
||||
|
||||
@weblogin_required(role=consts.Roles.STAFF)
|
||||
@weblogin_required(role=consts.UserRole.STAFF)
|
||||
def download(request: 'HttpRequest', download_id: str) -> 'HttpResponse':
|
||||
"""
|
||||
Downloadables management
|
||||
|
@ -424,7 +424,7 @@ class TestOpenStackClient(UDSTransactionTestCase):
|
||||
def test_auth_cached(self) -> None:
|
||||
# Get a new client, it should be cached
|
||||
cached_value = self.oclient.cache.get('auth')
|
||||
# Unauthorized
|
||||
# Unauthorized
|
||||
self.oclient._authenticated = False
|
||||
|
||||
with mock.patch.object(self.oclient.cache, 'get', return_value=cached_value) as mock_cache_get:
|
||||
|
@ -103,7 +103,7 @@ class UDSClientMixin:
|
||||
kwargs['headers'] = self.uds_headers
|
||||
|
||||
def compose_rest_url(self, method: str) -> str:
|
||||
return f'{REST_PATH}/{method}'
|
||||
return f'{REST_PATH}{method}'
|
||||
|
||||
|
||||
class UDSClient(UDSClientMixin, Client):
|
||||
|
Loading…
x
Reference in New Issue
Block a user