diff --git a/server/src/uds/REST/documentation.py b/server/src/uds/REST/documentation.py index b129f9ca7..079d19bb4 100644 --- a/server/src/uds/REST/documentation.py +++ b/server/src/uds/REST/documentation.py @@ -29,17 +29,22 @@ """ Author: Adolfo Gómez, dkmaster at dkmon dot com """ +import dataclasses import logging import typing from django import http +from django.shortcuts import render from django.views.generic.base import View +from uds.core.auths import auth +from uds.core import consts, types + from .dispatcher import Dispatcher # Not imported at runtime, just for type checking if typing.TYPE_CHECKING: - pass + from uds.core import types logger = logging.getLogger(__name__) @@ -49,6 +54,41 @@ class Documentation(View): def dispatch( self, request: 'http.request.HttpRequest', *_args: typing.Any, **kwargs: typing.Any ) -> 'http.HttpResponse': + request = typing.cast('types.requests.ExtendedHttpRequest', request) + if not request.user or not request.authorized: + return auth.weblogout(request) + + if not request.user.get_role().can_access(consts.UserRole.STAFF): + return auth.weblogout(request) + + @dataclasses.dataclass + class HelpInfo: + level: int + path: str + text: str + + help_data: list[HelpInfo] = [] + + def _process_node(node: 'types.rest.HelpNode', path: str, level: int) -> None: + help_data.append(HelpInfo(level, path, node.help.text)) + + for child in node.children: + _process_node( + child, + path + '/' + child.help.path, + level + (0 if node.kind == types.rest.HelpNode.HelpNodeType.PATH else 1), + ) + + _process_node(Dispatcher.base_handler_node.help_node(), '', 0) + + response = render( + request=request, + template_name='uds/modern/documentation.html', + context={'help': help_data}, + ) + + return response + service = Dispatcher.base_handler_node - - return http.HttpResponseServerError(f'{service.tree()}', content_type="text/plain") + + # return http.HttpResponseServerError(f'{service.tree()}', content_type="text/plain") diff --git a/server/src/uds/REST/handlers.py b/server/src/uds/REST/handlers.py index dc8cf964a..1a2462841 100644 --- a/server/src/uds/REST/handlers.py +++ b/server/src/uds/REST/handlers.py @@ -52,6 +52,7 @@ if typing.TYPE_CHECKING: logger = logging.getLogger(__name__) + class Handler: """ REST requests handler base class @@ -63,13 +64,10 @@ class Handler: path: typing.ClassVar[typing.Optional[str]] = ( None # Path for this method, so we can do /auth/login, /auth/logout, /auth/auths in a simple way ) - authenticated: typing.ClassVar[bool] = ( - True # By default, all handlers needs authentication. Will be overwriten if needs_admin or needs_staff, - ) - needs_admin: typing.ClassVar[bool] = ( - False # By default, the methods will be accessible by anyone if nothing else indicated - ) - needs_staff: typing.ClassVar[bool] = False # By default, staff + + min_access_role: typing.ClassVar[consts.UserRole] = ( + consts.UserRole.USER + ) # By default, only users can access # For implementing help # A list of pairs of (path, help) for subpaths on this handler @@ -85,7 +83,9 @@ class Handler: # These are the "path" split by /, that is, the REST invocation arguments _args: list[str] _kwargs: dict[str, typing.Any] # This are the "path" split by /, that is, the REST invocation arguments - _headers: dict[str, str] # Note: These are "output" headers, not input headers (input headers can be retrieved from request) + _headers: dict[ + str, str + ] # Note: These are "output" headers, not input headers (input headers can be retrieved from request) _session: typing.Optional[SessionStore] _auth_token: typing.Optional[str] _user: 'User' @@ -103,14 +103,6 @@ class Handler: *args: str, **kwargs: typing.Any, ): - logger.debug('Data: %s %s %s', self.__class__, self.needs_admin, self.authenticated) - if ( - self.needs_admin or self.needs_staff - ) and not self.authenticated: # If needs_admin, must also be authenticated - raise Exception( - f'class {self.__class__} is not authenticated but has needs_admin or needs_staff set!!' - ) - self._request = request self._path = path self._operation = method @@ -119,7 +111,8 @@ class Handler: self._kwargs = kwargs self._headers = {} self._auth_token = None - if self.authenticated: # Only retrieve auth related data on authenticated handlers + + if self.min_access_role.needs_authentication: try: self._auth_token = self._request.headers.get(consts.auth.AUTH_TOKEN_HEADER, '') self._session = SessionStore(session_key=self._auth_token) @@ -132,11 +125,10 @@ class Handler: if self._auth_token is None: raise AccessDenied() - if self.needs_admin and not self.is_admin(): + self._user = self.get_user() + if not self._user.can_access(self.min_access_role): raise AccessDenied() - if self.needs_staff and not self.is_staff_member(): - raise AccessDenied() try: self._user = self.get_user() except Exception as e: @@ -158,10 +150,10 @@ class Handler: def header(self, header_name: str) -> typing.Optional[str]: """ Get's an specific header name from REST request - + Args: header_name: Name of header to retrieve - + Returns: Value of header or None if not found """ diff --git a/server/src/uds/REST/methods/accounts.py b/server/src/uds/REST/methods/accounts.py index c971a6abe..88643fdbe 100644 --- a/server/src/uds/REST/methods/accounts.py +++ b/server/src/uds/REST/methods/accounts.py @@ -89,12 +89,41 @@ class Accounts(ModelHandler): return self.add_default_fields([], ['name', 'comments', 'tags']) def timemark(self, item: 'Model') -> typing.Any: + """ + API: + Description: + Generates a time mark associated with the account. + This is useful to easily identify when the account data was last updated. + (For example, one user enters an service, we get the usage data and "timemark" it, later read again + and we can identify that all data before this timemark has already been processed) + + Parameters: + - item: Account + Description of the item parameter + + Response: + 200: All fine, no data returned + """ item = ensure.is_instance(item, Account) item.time_mark = datetime.datetime.now() item.save() return '' def clear(self, item: 'Model') -> typing.Any: + """ + Api documentation for the method. From here, will be used by the documentation generator + Always starts with API: + API: + Description: + Clears all usage associated with the account + + Parameters: + - item: Account + Description of the item parameter + + Response: + 200: All fine, no data returned + """ item = ensure.is_instance(item, Account) self.ensure_has_access(item, uds.core.types.permissions.PermissionType.MANAGEMENT) return item.usages.filter(user_service=None).delete() diff --git a/server/src/uds/REST/methods/actor_v3.py b/server/src/uds/REST/methods/actor_v3.py index e0f4fcbfc..a3ce2a016 100644 --- a/server/src/uds/REST/methods/actor_v3.py +++ b/server/src/uds/REST/methods/actor_v3.py @@ -145,7 +145,7 @@ def clear_failed_ip_counter(request: 'ExtendedHttpRequest') -> None: class ActorV3Action(Handler): - authenticated = False # Actor requests are not authenticated normally + min_access_role = consts.UserRole.ANONYMOUS path = 'actor/v3' @staticmethod @@ -291,8 +291,7 @@ class Register(ActorV3Action): """ - authenticated = True - needs_staff = True + min_access_role = consts.UserRole.STAFF name = 'register' diff --git a/server/src/uds/REST/methods/cache.py b/server/src/uds/REST/methods/cache.py index aba8e23b7..d5026dcff 100644 --- a/server/src/uds/REST/methods/cache.py +++ b/server/src/uds/REST/methods/cache.py @@ -35,7 +35,7 @@ import typing from django.core.cache import caches -from uds.core import exceptions +from uds.core import exceptions, consts from uds.core.util.cache import Cache as UCache from uds.REST import Handler @@ -44,8 +44,7 @@ logger = logging.getLogger(__name__) # Enclosed methods under /cache path class Cache(Handler): - authenticated = True - needs_admin = True + min_access_role = consts.UserRole.ADMIN def get(self) -> typing.Any: """ diff --git a/server/src/uds/REST/methods/client.py b/server/src/uds/REST/methods/client.py index 16daac372..dfb6e5797 100644 --- a/server/src/uds/REST/methods/client.py +++ b/server/src/uds/REST/methods/client.py @@ -58,7 +58,7 @@ class Client(Handler): Processes Client requests """ - authenticated = False # Client requests are not authenticated + min_access_role = consts.UserRole.ANONYMOUS @staticmethod def result( diff --git a/server/src/uds/REST/methods/config.py b/server/src/uds/REST/methods/config.py index a37b687c4..9ffe62d68 100644 --- a/server/src/uds/REST/methods/config.py +++ b/server/src/uds/REST/methods/config.py @@ -33,6 +33,7 @@ Author: Adolfo Gómez, dkmaster at dkmon dot com import typing import logging +from uds.core import consts from uds.core.util.config import Config as CfgConfig from uds.REST import Handler @@ -42,7 +43,7 @@ logger = logging.getLogger(__name__) # Enclosed methods under /config path class Config(Handler): - needs_admin = True # By default, staff is lower level needed + min_access_role = consts.UserRole.ADMIN def get(self) -> typing.Any: return CfgConfig.get_config_values(self.is_admin()) diff --git a/server/src/uds/REST/methods/connection.py b/server/src/uds/REST/methods/connection.py index dc10dc88a..8fb2c69a9 100644 --- a/server/src/uds/REST/methods/connection.py +++ b/server/src/uds/REST/methods/connection.py @@ -34,7 +34,7 @@ import datetime import logging import typing -from uds.core import exceptions, types +from uds.core import exceptions, types, consts from uds.core.managers.crypto import CryptoManager from uds.core.managers.userservice import UserServiceManager from uds.core.exceptions.services import ServiceNotReadyError @@ -51,9 +51,7 @@ class Connection(Handler): Processes actor requests """ - authenticated = True # Actor requests are not authenticated - needs_admin = False - needs_staff = False + min_access_role = consts.UserRole.USER @staticmethod def result( diff --git a/server/src/uds/REST/methods/gui_callback.py b/server/src/uds/REST/methods/gui_callback.py index e84375ccc..b4076e4fa 100644 --- a/server/src/uds/REST/methods/gui_callback.py +++ b/server/src/uds/REST/methods/gui_callback.py @@ -32,7 +32,7 @@ Author: Adolfo Gómez, dkmaster at dkmon dot com """ import logging -from uds.core import exceptions, types +from uds.core import exceptions, types, consts from uds.core.ui import gui from uds.REST import Handler @@ -43,8 +43,8 @@ logger = logging.getLogger(__name__) class Callback(Handler): path = 'gui' - authenticated = True - needs_staff = True + + min_access_role = consts.UserRole.STAFF def get(self) -> types.ui.CallbackResultType: if len(self._args) != 1: diff --git a/server/src/uds/REST/methods/login_logout.py b/server/src/uds/REST/methods/login_logout.py index 90c770cc4..058a6d637 100644 --- a/server/src/uds/REST/methods/login_logout.py +++ b/server/src/uds/REST/methods/login_logout.py @@ -56,7 +56,7 @@ class Login(Handler): """ path = 'auth' - authenticated = False # Public method + min_access_role = consts.UserRole.ANONYMOUS @staticmethod def result( @@ -221,7 +221,7 @@ class Logout(Handler): class Auths(Handler): path = 'auth' - authenticated = False # By default, all handlers needs authentication + min_access_role = consts.UserRole.ANONYMOUS def auths(self) -> collections.abc.Iterable[dict[str, typing.Any]]: all_param: bool = self._params.get('all', 'false').lower() == 'true' diff --git a/server/src/uds/REST/methods/meta_pools.py b/server/src/uds/REST/methods/meta_pools.py index e6c41d57a..9edb284e3 100644 --- a/server/src/uds/REST/methods/meta_pools.py +++ b/server/src/uds/REST/methods/meta_pools.py @@ -109,8 +109,8 @@ class MetaPools(ModelHandler): ] custom_methods = [ - types.rest.ModelCustomMethod('setFallbackAccess', True), - types.rest.ModelCustomMethod('getFallbackAccess', True), + types.rest.ModelCustomMethod('set_fallback_access', True), + types.rest.ModelCustomMethod('get_fallback_access', True), ] def item_as_dict(self, item: 'Model') -> dict[str, typing.Any]: diff --git a/server/src/uds/REST/methods/permissions.py b/server/src/uds/REST/methods/permissions.py index b69ad56de..47e9e0117 100644 --- a/server/src/uds/REST/methods/permissions.py +++ b/server/src/uds/REST/methods/permissions.py @@ -36,7 +36,7 @@ import typing import uds.core.types.permissions from uds import models -from uds.core import exceptions +from uds.core import consts, exceptions from uds.core.util import permissions from uds.core.util.rest.tools import match from uds.REST import Handler @@ -53,8 +53,7 @@ class Permissions(Handler): """ Processes permissions requests """ - - needs_admin = True + min_access_role = consts.UserRole.ADMIN @staticmethod def get_class(class_name: str) -> type['Model']: diff --git a/server/src/uds/REST/methods/reports.py b/server/src/uds/REST/methods/reports.py index ef233d1d2..a64c8ef6d 100644 --- a/server/src/uds/REST/methods/reports.py +++ b/server/src/uds/REST/methods/reports.py @@ -63,9 +63,8 @@ class Reports(model.BaseModelHandler): """ Processes reports requests """ - - needs_admin = True # By default, staff is lower level needed - + min_access_role = consts.UserRole.ADMIN + table_title = _('Available reports') table_fields = [ {'group': {'title': _('Group')}}, diff --git a/server/src/uds/REST/methods/servers.py b/server/src/uds/REST/methods/servers.py index d513f804b..f5ad70e38 100644 --- a/server/src/uds/REST/methods/servers.py +++ b/server/src/uds/REST/methods/servers.py @@ -138,14 +138,15 @@ class ServerRegisterBase(Handler): class ServerRegister(ServerRegisterBase): - needs_staff = True + min_access_role = consts.UserRole.STAFF + path = 'servers' name = 'register' # REST handlers for server actions class ServerTest(Handler): - authenticated = False # Test is not authenticated, the auth is the token to test itself + min_access_role = consts.UserRole.ANONYMOUS path = 'servers' name = 'test' @@ -172,7 +173,7 @@ class ServerEvent(Handler): * log """ - authenticated = False # Actor requests are not authenticated normally + min_access_role = consts.UserRole.ANONYMOUS path = 'servers' name = 'event' diff --git a/server/src/uds/REST/methods/services_pool_groups.py b/server/src/uds/REST/methods/services_pool_groups.py index 5ada06192..af59edcd8 100644 --- a/server/src/uds/REST/methods/services_pool_groups.py +++ b/server/src/uds/REST/methods/services_pool_groups.py @@ -58,8 +58,6 @@ class ServicesPoolGroups(ModelHandler): Handles the gallery REST interface """ - # needs_admin = True - path = 'gallery' model = ServicePoolGroup save_fields = ['name', 'comments', 'image_id', 'priority'] diff --git a/server/src/uds/REST/methods/stats.py b/server/src/uds/REST/methods/stats.py index 5c6e279e5..091c33bfd 100644 --- a/server/src/uds/REST/methods/stats.py +++ b/server/src/uds/REST/methods/stats.py @@ -34,8 +34,7 @@ import logging import datetime import typing -from uds.core.types.rest import HelpPath -from uds.core import types +from uds.core import types, consts from uds.REST import Handler from uds import models from uds.core.util.stats import counters @@ -45,11 +44,10 @@ logger = logging.getLogger(__name__) # Enclosed methods under /cache path class Stats(Handler): - authenticated = True - needs_admin = True + min_access_role = consts.UserRole.ADMIN help_paths = [ - HelpPath('', 'Returns the last day usage statistics for all authenticators'), + types.rest.HelpPath('', 'Returns the last day usage statistics for all authenticators'), ] help_text = 'Provides access to usage statistics' diff --git a/server/src/uds/REST/methods/system.py b/server/src/uds/REST/methods/system.py index 8b5e33300..82f01fbab 100644 --- a/server/src/uds/REST/methods/system.py +++ b/server/src/uds/REST/methods/system.py @@ -38,8 +38,7 @@ import pickletools import typing from uds import models -from uds.core.types.rest import HelpPath -from uds.core import exceptions, types +from uds.core import exceptions, types, consts from uds.core.util import permissions from uds.core.util.cache import Cache from uds.core.util.model import process_uuid, sql_now @@ -144,19 +143,18 @@ class System(Handler): } """ - needs_admin = False - needs_staff = True + min_access_role = consts.UserRole.STAFF help_paths = [ - HelpPath('', ''), - HelpPath('stats/assigned', ''), - HelpPath('stats/inuse', ''), - HelpPath('stats/cached', ''), - HelpPath('stats/complete', ''), - HelpPath('stats/assigned/', ''), - HelpPath('stats/inuse/', ''), - HelpPath('stats/cached/', ''), - HelpPath('stats/complete/', ''), + types.rest.HelpPath('', ''), + types.rest.HelpPath('stats/assigned', ''), + types.rest.HelpPath('stats/inuse', ''), + types.rest.HelpPath('stats/cached', ''), + types.rest.HelpPath('stats/complete', ''), + types.rest.HelpPath('stats/assigned/', ''), + types.rest.HelpPath('stats/inuse/', ''), + types.rest.HelpPath('stats/cached/', ''), + types.rest.HelpPath('stats/complete/', ''), ] help_text = 'Provides system information. Must be admin to access this' diff --git a/server/src/uds/REST/methods/tickets.py b/server/src/uds/REST/methods/tickets.py index 0b3767184..e2e28305a 100644 --- a/server/src/uds/REST/methods/tickets.py +++ b/server/src/uds/REST/methods/tickets.py @@ -40,7 +40,7 @@ from uds import models from uds.core.managers.crypto import CryptoManager from uds.core.util.model import process_uuid from uds.core.util import ensure -from uds.core import exceptions +from uds.core import consts, exceptions logger = logging.getLogger(__name__) @@ -89,7 +89,7 @@ class Tickets(Handler): - servicePool has these groups in it's allowed list """ - needs_admin = True # By default, staff is lower level needed + min_access_role = consts.UserRole.ADMIN @staticmethod def result(result: str = '', error: typing.Optional[str] = None) -> dict[str, typing.Any]: diff --git a/server/src/uds/REST/methods/tunnel_ticket.py b/server/src/uds/REST/methods/tunnel_ticket.py index 950eec6bd..493cfffff 100644 --- a/server/src/uds/REST/methods/tunnel_ticket.py +++ b/server/src/uds/REST/methods/tunnel_ticket.py @@ -34,7 +34,7 @@ import logging import typing from uds import models -from uds.core import exceptions, types +from uds.core import consts, exceptions, types from uds.core.auths.auth import is_trusted_source from uds.core.util import log, net from uds.core.util.model import sql_stamp_seconds @@ -54,7 +54,7 @@ class TunnelTicket(Handler): Processes tunnel requests """ - authenticated = False # Client requests are not authenticated + min_access_role = consts.UserRole.ANONYMOUS path = 'tunnel' name = 'ticket' @@ -148,7 +148,8 @@ class TunnelTicket(Handler): class TunnelRegister(ServerRegisterBase): - needs_admin = True + min_access_role = consts.UserRole.ADMIN + path = 'tunnel' name = 'register' diff --git a/server/src/uds/REST/methods/version.py b/server/src/uds/REST/methods/version.py index ca09334a8..00907f1a7 100644 --- a/server/src/uds/REST/methods/version.py +++ b/server/src/uds/REST/methods/version.py @@ -40,7 +40,7 @@ logger = logging.getLogger(__name__) class UDSVersion(Handler): - authenticated = False # Version requests are public + min_access_role = consts.UserRole.ANONYMOUS name = 'version' def get(self) -> collections.abc.MutableMapping[str, typing.Any]: diff --git a/server/src/uds/REST/model/model.py b/server/src/uds/REST/model/model.py index 289bfbf59..acb6aeb6f 100644 --- a/server/src/uds/REST/model/model.py +++ b/server/src/uds/REST/model/model.py @@ -72,8 +72,7 @@ class ModelHandler(BaseModelHandler): """ # Authentication related - authenticated = True - needs_staff = True + min_access_role = consts.UserRole.STAFF # Which model does this manage, must be a django model ofc model: 'typing.ClassVar[type[models.Model]]' @@ -297,8 +296,8 @@ class ModelHandler(BaseModelHandler): # if has custom methods, look for if this request matches any of them for cm in self.custom_methods: # Convert to snake case - camel_case_name, snake_case_name = camel_and_snake_case_from(cm[0]) - if number_of_args > 1 and cm[1] is True: # Method needs parent (existing item) + camel_case_name, snake_case_name = camel_and_snake_case_from(cm.name) + if number_of_args > 1 and cm.needs_parent: # Method needs parent (existing item) if self._args[1] in (camel_case_name, snake_case_name): item = None # Check if operation method exists diff --git a/server/src/uds/admin/views/__init__.py b/server/src/uds/admin/views/__init__.py index fbce2f906..13f630edb 100644 --- a/server/src/uds/admin/views/__init__.py +++ b/server/src/uds/admin/views/__init__.py @@ -45,7 +45,7 @@ if typing.TYPE_CHECKING: from django.http import HttpRequest -@weblogin_required(role=consts.Roles.ADMIN) +@weblogin_required(role=consts.UserRole.ADMIN) def index(request: 'HttpRequest') -> HttpResponse: # Gets csrf token csrf_token = csrf.get_token(request) diff --git a/server/src/uds/core/auths/auth.py b/server/src/uds/core/auths/auth.py index b82d5034e..ec64624c7 100644 --- a/server/src/uds/core/auths/auth.py +++ b/server/src/uds/core/auths/auth.py @@ -117,7 +117,7 @@ def root_user() -> models.User: # Decorator to make easier protect pages that needs to be logged in def weblogin_required( - role: typing.Optional[consts.Roles] = None, + role: typing.Optional[consts.UserRole] = None, ) -> collections.abc.Callable[ [collections.abc.Callable[..., HttpResponse]], collections.abc.Callable[..., HttpResponse] ]: @@ -149,8 +149,8 @@ def weblogin_required( if not request.user or not request.authorized: return weblogout(request) - if role in (consts.Roles.ADMIN, consts.Roles.STAFF): - if request.user.is_staff() is False or (role == consts.Roles.ADMIN and not request.user.is_admin): + if role in (consts.UserRole.ADMIN, consts.UserRole.STAFF): + if request.user.is_staff() is False or (role == consts.UserRole.ADMIN and not request.user.is_admin): return HttpResponseForbidden(_('Forbidden')) return view_func(request, *args, **kwargs) diff --git a/server/src/uds/core/consts/__init__.py b/server/src/uds/core/consts/__init__.py index 1ac8bec51..4fa8ed771 100644 --- a/server/src/uds/core/consts/__init__.py +++ b/server/src/uds/core/consts/__init__.py @@ -77,14 +77,45 @@ UNLIMITED: typing.Final[int] = -1 NO_MORE_NAMES: typing.Final[str] = 'NO-NAME-ERROR' -class Roles(enum.StrEnum): +class UserRole(enum.StrEnum): """ Roles for users """ ADMIN = 'admin' STAFF = 'staff' - + # Currently not used, but reserved USER = 'user' ANONYMOUS = 'anonymous' + + @property + def needs_authentication(self) -> bool: + """ + Checks if this role needs authentication + + Returns: + True if this role needs authentication, False otherwise + """ + return self != UserRole.ANONYMOUS + + def can_access(self, role: 'UserRole') -> bool: + """ + Checks if this role can access to the requested role + + That is, if this role is greater or equal to the requested role + + Args: + role: Role to check against + + Returns: + True if this role can access to the requested role, False otherwise + """ + ROLE_PRECEDENCE: typing.Final = { + UserRole.ADMIN: 3, + UserRole.STAFF: 2, + UserRole.USER: 1, + UserRole.ANONYMOUS: 0, + } + + return ROLE_PRECEDENCE[self] >= ROLE_PRECEDENCE[role] diff --git a/server/src/uds/core/types/rest.py b/server/src/uds/core/types/rest.py index d5636c040..1e398f1ea 100644 --- a/server/src/uds/core/types/rest.py +++ b/server/src/uds/core/types/rest.py @@ -30,6 +30,8 @@ Author: Adolfo Gómez, dkmaster at dkmon dot com """ import abc +import enum +import re import typing import dataclasses import collections.abc @@ -96,7 +98,8 @@ class TypeInfo: # This is a named tuple for convenience, and must be # compatible with tuple[str, bool] (name, needs_parent) -class ModelCustomMethod(typing.NamedTuple): +@dataclasses.dataclass +class ModelCustomMethod: name: str needs_parent: bool = True @@ -109,17 +112,80 @@ ItemGeneratorType = typing.Generator[ItemDictType, None, None] # Alias for get_items return type ManyItemsDictType = typing.Union[ItemListType, ItemDictType, ItemGeneratorType] -# +# FieldType = collections.abc.Mapping[str, typing.Any] +# Regular expression to match the API: part of the docstring +# should be a multi line string, with a line containing only "API:" (with leading and trailing \s) +API_RE = re.compile(r'(?ms)^\s*API:\s*$') -class HelpPath(typing.NamedTuple): + +@dataclasses.dataclass(eq=False) +class HelpPath: """ Help helper class """ path: str - help: str + text: str + + def __init__(self, path: str, help: str): + self.path = path + self.text = HelpPath.process_help(help) + + def __hash__(self) -> int: + return hash(self.path) + + def __eq__(self, other: object) -> bool: + if not isinstance(other, HelpPath): + return False + return self.path == other.path + + @property + def is_empty(self) -> bool: + return self.path == '' and self.text == '' + + @staticmethod + def process_help(help: str) -> str: + """ + Processes the help string, removing leading and trailing spaces + """ + match = API_RE.search(help) + if match: + return help[match.end() :].strip() + + return '' + + +@dataclasses.dataclass(frozen=True) +class HelpNode: + class HelpNodeType(enum.StrEnum): + MODEL = 'model' + DETAIL = 'detail' + CUSTOM = 'custom' + PATH = 'path' + + help: HelpPath + children: list['HelpNode'] # Children nodes + kind: HelpNodeType + + def __hash__(self) -> int: + return hash(self.help.path) + + def __eq__(self, other: object) -> bool: + if isinstance(other, HelpNode): + return self.help.path == other.help.path + if not isinstance(other, HelpPath): + return False + + return self.help.path == other.path + + @property + def is_empty(self) -> bool: + return self.help.is_empty and not self.children + + def __str__(self) -> str: + return f'HelpNode({self.help}, {self.children})' @dataclasses.dataclass(frozen=True) @@ -144,7 +210,7 @@ class HandlerNode: Returns a string representation of the tree """ from uds.REST.model import ModelHandler - + if self.handler is None: return f'{" " * level}|- {self.name}\n' + ''.join( child.tree(level + 1) for child in self.children.values() @@ -163,13 +229,66 @@ class HandlerNode: return ret + ''.join(child.tree(level + 1) for child in self.children.values()) + def help_node(self) -> HelpNode: + """ + Returns a HelpNode for this node (and children recursively) + """ + from uds.REST.model import ModelHandler + + custom_help: set[HelpNode] = set() + + help_node_type = HelpNode.HelpNodeType.PATH + + if self.handler: + help_node_type = HelpNode.HelpNodeType.CUSTOM + if issubclass(self.handler, ModelHandler): + help_node_type = HelpNode.HelpNodeType.MODEL + # Add custom_methods + for method in self.handler.custom_methods: + # Method is a Me CustomModelMethod, + # We access the __doc__ of the function inside the handler with method.name + doc = getattr(self.handler, method.name).__doc__ or '' + custom_help.add( + HelpNode( + HelpPath(path=self.full_path() + '/' + method.name, help=doc), + [], + HelpNode.HelpNodeType.CUSTOM, + ) + ) + + # Add detail methods + if self.handler.detail: + for method in self.handler.detail.keys(): + custom_help.add( + HelpNode( + HelpPath(path=self.full_path() + '/' + method, help=''), + [], + HelpNode.HelpNodeType.DETAIL, + ) + ) + + custom_help |= { + HelpNode(HelpPath(path=help_info.path, help=help_info.text), [], help_node_type) + for help_info in self.handler.help_paths + } + + custom_help |= {child.help_node() for child in self.children.values()} + + return HelpNode( + help=HelpPath(path=self.full_path(), help=self.handler.__doc__ or ''), + children=list(custom_help), + kind=help_node_type, + ) + def find_path(self, path: str | list[str]) -> typing.Optional['HandlerNode']: """ Returns the node for a given path, or None if not found """ if not path or not self.children: return self - path = path.split('/') if isinstance(path, str) else path + + # Remove any trailing '/' to allow some "bogus" paths with trailing slashes + path = path.lstrip('/').split('/') if isinstance(path, str) else path if path[0] not in self.children: return None @@ -188,4 +307,4 @@ class HandlerNode: if parent_full_path == '': return self.name - return f'{parent_full_path}/{self.name}' \ No newline at end of file + return f'{parent_full_path}/{self.name}' diff --git a/server/src/uds/models/user.py b/server/src/uds/models/user.py index ca774b51b..95ca1f734 100644 --- a/server/src/uds/models/user.py +++ b/server/src/uds/models/user.py @@ -35,7 +35,7 @@ import typing from django.db import models from django.db.models import Count, Q, signals -from uds.core import auths, mfas, types +from uds.core import auths, mfas, types, consts from uds.core.util import log, storage, properties from .authenticator import Authenticator @@ -46,7 +46,6 @@ from .uuid_model import UUIDModel # Not imported at runtime, just for type checking if typing.TYPE_CHECKING: from uds.models import Group, UserService, Permissions - from uds.core.types.requests import ExtendedHttpRequest from django.db.models.manager import RelatedManager @@ -133,6 +132,26 @@ class User(UUIDModel, properties.PropertiesMixin): """ return self.staff_member or self.is_admin + def get_role(self) -> consts.UserRole: + """ + Returns the role of the user + """ + if self.pk is None: + return consts.UserRole.ANONYMOUS + + if self.is_admin: + return consts.UserRole.ADMIN + if self.staff_member: + return consts.UserRole.STAFF + + return consts.UserRole.USER + + def can_access(self, role: consts.UserRole) -> bool: + """ + Returns true if the user has more or equal role than the one passed as argument + """ + return self.get_role().can_access(role) + def update_last_access(self) -> None: """ Updates the last access for this user with the current time of the sql server @@ -140,7 +159,7 @@ class User(UUIDModel, properties.PropertiesMixin): self.last_access = sql_now() self.save(update_fields=['last_access']) - def logout(self, request: 'ExtendedHttpRequest') -> types.auth.AuthenticationResult: + def logout(self, request: 'types.requests.ExtendedHttpRequest') -> types.auth.AuthenticationResult: """ Invoked to log out this user Returns the url where to redirect user, or None if default url will be used diff --git a/server/src/uds/templates/500.html b/server/src/uds/templates/500.html index fe1ab70b5..1c382a0e5 100644 --- a/server/src/uds/templates/500.html +++ b/server/src/uds/templates/500.html @@ -11,7 +11,7 @@ } - +