1
0
mirror of https://github.com/dkmstr/openuds.git synced 2025-03-20 06:50:23 +03:00

Started endpoint for REST API documentation endpoint and refactor authentication role checks to be more clear.

This commit is contained in:
Adolfo Gómez García 2025-01-25 19:42:13 +01:00
parent 84d565ec19
commit b9f4e7f2ea
No known key found for this signature in database
GPG Key ID: DD1ABF20724CDA23
14 changed files with 205 additions and 86 deletions

View File

@ -31,5 +31,6 @@ Author: Adolfo Gómez, dkmaster at dkmon dot com
"""
# pyright: reportUnusedImport=false
# Convenience imports, must be present before initializing handlers
from .handlers import Handler
from .handlers import Handler, HelpPath
from .dispatcher import Dispatcher
from .documentation import Documentation

View File

@ -46,7 +46,7 @@ from uds.core.util import modfinder
from . import processors, log
from .handlers import Handler
from .model import DetailHandler
from .model import DetailHandler, ModelHandler
# Not imported at runtime, just for type checking
if typing.TYPE_CHECKING:
@ -65,7 +65,8 @@ class HandlerNode:
name: str
handler: typing.Optional[type[Handler]]
children: collections.abc.MutableMapping[str, 'HandlerNode']
parent: typing.Optional['HandlerNode']
children: dict[str, 'HandlerNode']
def __str__(self) -> str:
return f'HandlerNode({self.name}, {self.handler}, {self.children})'
@ -77,10 +78,50 @@ class HandlerNode:
"""
Returns a string representation of the tree
"""
ret = f'{" " * level}{self.name} ({self.handler.__name__ if self.handler else "None"})\n'
for child in self.children.values():
ret += child.tree(level + 1)
return ret
if self.handler is None:
return f'{" " * level}|- {self.name}\n' + ''.join(
child.tree(level + 1) for child in self.children.values()
)
ret = f'{" " * level}{self.name} ({self.handler.__name__} {self.full_path()})\n'
if issubclass(self.handler, ModelHandler):
# Add custom_methods
for method in self.handler.custom_methods:
ret += f'{" " * level} |- {method}\n'
# Add detail methods
if self.handler.detail:
for method in self.handler.detail.keys():
ret += f'{" " * level} |- {method}\n'
return ret + ''.join(child.tree(level + 1) for child in self.children.values())
def find_path(self, path: str | list[str]) -> typing.Optional['HandlerNode']:
"""
Returns the node for a given path, or None if not found
"""
if not path or not self.children:
return self
path = path.split('/') if isinstance(path, str) else path
if path[0] not in self.children:
return None
return self.children[path[0]].find_path(path[1:]) # Recursive call
def full_path(self) -> str:
"""
Returns the full path of this node
"""
if self.name == '' or self.parent is None:
return ''
parent_full_path = self.parent.full_path()
if parent_full_path == '':
return self.name
return f'{parent_full_path}/{self.name}'
class Dispatcher(View):
@ -89,7 +130,7 @@ class Dispatcher(View):
"""
# This attribute will contain all paths--> handler relations, filled at Initialized method
services: typing.ClassVar[HandlerNode] = HandlerNode('', None, {})
base_handler_node: typing.ClassVar[HandlerNode] = HandlerNode('', None, None, {})
@method_decorator(csrf_exempt)
def dispatch(
@ -103,35 +144,40 @@ class Dispatcher(View):
del request.session
# Now we extract method and possible variables from path
path: list[str] = kwargs['arguments'].split('/')
# path: list[str] = kwargs['arguments'].split('/')
path = kwargs['arguments']
del kwargs['arguments']
# Transverse service nodes, so we can locate class processing this path
service = Dispatcher.services
full_path_lst: list[str] = []
# Guess content type from content type header (post) or ".xxx" to method
# # Transverse service nodes, so we can locate class processing this path
# service = Dispatcher.services
# full_path_lst: list[str] = []
# # Guess content type from content type header (post) or ".xxx" to method
content_type: str = request.META.get('CONTENT_TYPE', 'application/json').split(';')[0]
while path:
clean_path = path[0]
# Skip empty path elements, so /x/y == /x////y for example (due to some bugs detected on some clients)
if not clean_path:
path = path[1:]
continue
# while path:
# clean_path = path[0]
# # Skip empty path elements, so /x/y == /x////y for example (due to some bugs detected on some clients)
# if not clean_path:
# path = path[1:]
# continue
if clean_path in service.children: # if we have a node for this path, walk down
service = service.children[clean_path]
full_path_lst.append(path[0]) # Add this path to full path
path = path[1:] # Remove first part of path
else:
break # If we don't have a node for this path, we are done
# if clean_path in service.children: # if we have a node for this path, walk down
# service = service.children[clean_path]
# full_path_lst.append(path[0]) # Add this path to full path
# path = path[1:] # Remove first part of path
# else:
# break # If we don't have a node for this path, we are done
full_path = '/'.join(full_path_lst)
logger.debug("REST request: %s (%s)", full_path, content_type)
# full_path = '/'.join(full_path_lst)
handler_node = Dispatcher.base_handler_node.find_path(path)
if not handler_node:
return http.HttpResponseNotFound('Service not found', content_type="text/plain")
logger.debug("REST request: %s (%s)", handler_node, handler_node.full_path())
# Now, service points to the class that will process the request
# We get the '' node, that is the "current" node, and get the class from it
cls: typing.Optional[type[Handler]] = service.handler
cls: typing.Optional[type[Handler]] = handler_node.handler
if not cls:
return http.HttpResponseNotFound('Method not found', content_type="text/plain")
@ -146,14 +192,14 @@ class Dispatcher(View):
return http.HttpResponseNotAllowed(['GET', 'POST', 'PUT', 'DELETE'], content_type="text/plain")
# Path here has "remaining" path, that is, method part has been removed
args = tuple(path)
args = path[len(handler_node.full_path()):].split('/')[1:] # First element is always empty, so we skip it
handler: typing.Optional[Handler] = None
try:
handler = cls(
request,
full_path,
handler_node.full_path(),
http_method,
processor.process_parameters(),
*args,
@ -161,12 +207,12 @@ class Dispatcher(View):
)
operation: collections.abc.Callable[[], typing.Any] = getattr(handler, http_method)
except processors.ParametersException as e:
logger.debug('Path: %s', full_path)
logger.debug('Path: %s', )
logger.debug('Error: %s', e)
log.log_operation(handler, 400, types.log.LogLevel.ERROR)
return http.HttpResponseBadRequest(
f'Invalid parameters invoking {full_path}: {e}',
f'Invalid parameters invoking {handler_node.full_path()}: {e}',
content_type="text/plain",
)
except AttributeError:
@ -179,7 +225,7 @@ class Dispatcher(View):
except Exception:
log.log_operation(handler, 500, types.log.LogLevel.ERROR)
logger.exception('error accessing attribute')
logger.debug('Getting attribute %s for %s', http_method, full_path)
logger.debug('Getting attribute %s for %s', http_method, handler_node.full_path())
return http.HttpResponseServerError('Unexcepected error', content_type="text/plain")
# Invokes the handler's operation, add headers to response and returns
@ -198,7 +244,7 @@ class Dispatcher(View):
),
)
else:
response = processor.get_response(response)
response = processor.get_response(response)
# Set response headers
response['UDS-Version'] = f'{consts.system.VERSION};{consts.system.VERSION_STAMP}'
for k, val in handler.headers().items():
@ -230,7 +276,7 @@ class Dispatcher(View):
log.log_operation(handler, 500, types.log.LogLevel.ERROR)
# Get ecxeption backtrace
trace_back = traceback.format_exc()
logger.error('Exception processing request: %s', full_path)
logger.error('Exception processing request: %s', handler_node.full_path())
for i in trace_back.splitlines():
logger.error('* %s', i)
@ -248,20 +294,20 @@ class Dispatcher(View):
name = type_.name
# Fill the service_node tree with the class
service_node = Dispatcher.services # Root path
service_node = Dispatcher.base_handler_node # Root path
# If path, ensure that the path exists on the tree
if type_.path:
logger.info('Path: /%s/%s', type_.path, name)
for k in type_.path.split('/'):
intern_k = sys.intern(k)
if intern_k not in service_node.children:
service_node.children[intern_k] = HandlerNode(k, None, {})
service_node.children[intern_k] = HandlerNode(k, None, service_node, {})
service_node = service_node.children[intern_k]
else:
logger.info('Path: /%s', name)
if name not in service_node.children:
service_node.children[name] = HandlerNode(name, None, {})
service_node.children[name] = HandlerNode(name, None, service_node, {})
service_node.children[name] = dataclasses.replace(service_node.children[name], handler=type_)

View File

@ -0,0 +1,54 @@
# -*- coding: utf-8 -*-
#
# Copyright (c) 2025 Virtual Cable S.L.U.
# All rights reserved.
#
# Redistribution and use in source and binary forms, with or without modification,
# are permitted provided that the following conditions are met:
#
# * Redistributions of source code must retain the above copyright notice,
# this list of conditions and the following disclaimer.
# * Redistributions in binary form must reproduce the above copyright notice,
# this list of conditions and the following disclaimer in the documentation
# and/or other materials provided with the distribution.
# * Neither the name of Virtual Cable S.L.U. nor the names of its contributors
# may be used to endorse or promote products derived from this software
# without specific prior written permission.
#
# THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS IS"
# AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE
# IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE ARE
# DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT HOLDER OR CONTRIBUTORS BE LIABLE
# FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR CONSEQUENTIAL
# DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR
# SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER
# CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY,
# OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE
# OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
"""
Author: Adolfo Gómez, dkmaster at dkmon dot com
"""
import logging
import typing
from django import http
from django.views.generic.base import View
from .dispatcher import Dispatcher
# Not imported at runtime, just for type checking
if typing.TYPE_CHECKING:
pass
logger = logging.getLogger(__name__)
class Documentation(View):
def dispatch(
self, request: 'http.request.HttpRequest', *_args: typing.Any, **kwargs: typing.Any
) -> 'http.HttpResponse':
service = Dispatcher.base_handler_node
return http.HttpResponseServerError(f'{service.tree()}', content_type="text/plain")

View File

@ -52,6 +52,12 @@ if typing.TYPE_CHECKING:
logger = logging.getLogger(__name__)
class HelpPath(typing.NamedTuple):
"""
Help path class
"""
path: str
help: str
class Handler:
"""
@ -74,7 +80,7 @@ class Handler:
# For implementing help
# A list of pairs of (path, help) for subpaths on this handler
help_paths: typing.ClassVar[list[tuple[str, str]]] = []
help_paths: typing.ClassVar[list[HelpPath]] = []
help_text: typing.ClassVar[str] = 'No help available'
_request: 'ExtendedHttpRequestWithUser' # It's a modified HttpRequest

View File

@ -35,7 +35,7 @@ import datetime
import typing
from uds.core import types
from uds.REST import Handler
from uds.REST import Handler, HelpPath
from uds import models
from uds.core.util.stats import counters
@ -48,7 +48,7 @@ class Stats(Handler):
needs_admin = True
help_paths = [
('', 'Returns the last day usage statistics for all authenticators'),
HelpPath('', 'Returns the last day usage statistics for all authenticators'),
]
help_text = 'Provides access to usage statistics'

View File

@ -44,7 +44,7 @@ from uds.core.util.cache import Cache
from uds.core.util.model import process_uuid, sql_now
from uds.core.types.states import State
from uds.core.util.stats import counters
from uds.REST import Handler
from uds.REST import Handler, HelpPath
logger = logging.getLogger(__name__)
@ -147,15 +147,15 @@ class System(Handler):
needs_staff = True
help_paths = [
('', ''),
('stats/assigned', ''),
('stats/inuse', ''),
('stats/cached', ''),
('stats/complete', ''),
('stats/assigned/<servicePoolId>', ''),
('stats/inuse/<servicePoolId>', ''),
('stats/cached/<servicePoolId>', ''),
('stats/complete/<servicePoolId>', ''),
HelpPath('', ''),
HelpPath('stats/assigned', ''),
HelpPath('stats/inuse', ''),
HelpPath('stats/cached', ''),
HelpPath('stats/complete', ''),
HelpPath('stats/assigned/<servicePoolId>', ''),
HelpPath('stats/inuse/<servicePoolId>', ''),
HelpPath('stats/cached/<servicePoolId>', ''),
HelpPath('stats/complete/<servicePoolId>', ''),
]
help_text = 'Provides system information. Must be admin to access this'

View File

@ -34,7 +34,6 @@ import logging
from django.http import HttpResponse
from django.middleware import csrf
from django.shortcuts import render
from django.template import RequestContext, loader
from django.utils.translation import gettext as _
from uds.core import consts
@ -46,7 +45,7 @@ if typing.TYPE_CHECKING:
from django.http import HttpRequest
@weblogin_required(admin=True)
@weblogin_required(role=consts.Roles.ADMIN)
def index(request: 'HttpRequest') -> HttpResponse:
# Gets csrf token
csrf_token = csrf.get_token(request)
@ -57,19 +56,14 @@ def index(request: 'HttpRequest') -> HttpResponse:
{'csrf_field': consts.auth.CSRF_FIELD, 'csrf_token': csrf_token},
)
@weblogin_required(admin=True)
def tmpl(request: 'HttpRequest', template: str) -> HttpResponse:
try:
t = loader.get_template('uds/admin/tmpl/' + template + ".html")
c = RequestContext(request)
resp = t.render(c.flatten())
except Exception as e:
logger.debug('Exception getting template: %s', e)
resp = _('requested a template that do not exist')
return HttpResponse(resp, content_type="text/plain")
@weblogin_required(admin=True)
def sample(request: 'HttpRequest') -> HttpResponse:
return render(request, 'uds/admin/sample.html')
# from django.template import RequestContext, loader
# @weblogin_required(role=consts.Roles.ADMIN)
# def tmpl(request: 'HttpRequest', template: str) -> HttpResponse:
# try:
# t = loader.get_template('uds/admin/tmpl/' + template + ".html")
# c = RequestContext(request)
# resp = t.render(c.flatten())
# except Exception as e:
# logger.debug('Exception getting template: %s', e)
# resp = _('requested a template that do not exist')
# return HttpResponse(resp, content_type="text/plain")

View File

@ -117,17 +117,14 @@ def root_user() -> models.User:
# Decorator to make easier protect pages that needs to be logged in
def weblogin_required(
admin: typing.Union[bool, typing.Literal['admin']] = False
role: typing.Optional[consts.Roles] = None,
) -> collections.abc.Callable[
[collections.abc.Callable[..., HttpResponse]], collections.abc.Callable[..., HttpResponse]
]:
"""Decorator to set protection to access page
Look for samples at uds.core.web.views
if admin == True, needs admin or staff
if admin == 'admin', needs admin
Args:
admin (bool, optional): If True, needs admin or staff. Is it's "admin" literal, needs admin . Defaults to False (any user).
role (str, optional): If set, needs this role. Defaults to None.
Returns:
collections.abc.Callable[[collections.abc.Callable[..., HttpResponse]], collections.abc.Callable[..., HttpResponse]]: Decorator
@ -135,6 +132,7 @@ def weblogin_required(
Note:
This decorator is used to protect pages that needs to be logged in.
To protect against ajax calls, use `denyNonAuthenticated` instead
Roles as "inclusive", that is, if you set role to USER, it will allow all users that are not anonymous. (USER, STAFF, ADMIN)
"""
def decorator(
@ -151,8 +149,8 @@ def weblogin_required(
if not request.user or not request.authorized:
return weblogout(request)
if admin in (True, 'admin'):
if request.user.is_staff() is False or (admin == 'admin' and not request.user.is_admin):
if role in (consts.Roles.ADMIN, consts.Roles.STAFF):
if request.user.is_staff() is False or (role == consts.Roles.ADMIN and not request.user.is_admin):
return HttpResponseForbidden(_('Forbidden'))
return view_func(request, *args, **kwargs)

View File

@ -31,6 +31,7 @@
Author: Adolfo Gómez, dkmaster at dkmon dot com
"""
# pyright: reportUnusedImport=false
import enum
import time
import typing
from datetime import datetime
@ -74,3 +75,16 @@ UNLIMITED: typing.Final[int] = -1
# Constant marking no more names available
NO_MORE_NAMES: typing.Final[str] = 'NO-NAME-ERROR'
class Roles(enum.StrEnum):
"""
Roles for users
"""
ADMIN = 'admin'
STAFF = 'staff'
# Currently not used, but reserved
USER = 'user'
ANONYMOUS = 'anonymous'

View File

@ -267,6 +267,12 @@ urlpatterns = [
custom.custom,
name='custom',
),
# REST API documentation
re_path(
r'^uds/rest/doc/?(?P<doc>.*)$',
REST.Documentation.as_view(),
name='REST.doc',
),
# REST API
re_path(
r'^uds/rest/(?P<arguments>.*)$',

View File

@ -350,7 +350,7 @@ def login(request: types.requests.ExtendedHttpRequest, tag: typing.Optional[str]
@never_cache
@auth.weblogin_required(admin=False)
@auth.weblogin_required()
def logout(request: types.requests.ExtendedHttpRequestWithUser) -> HttpResponse:
auth.log_logout(request)
request.session['restricted'] = False # Remove restricted

View File

@ -31,6 +31,7 @@ Author: Adolfo Gómez, dkmaster at dkmon dot com
import logging
import typing
from uds.core import consts
from uds.core.auths.auth import weblogin_required
from uds.core.managers import downloads_manager
from .main import index
@ -45,7 +46,7 @@ if typing.TYPE_CHECKING:
logger = logging.getLogger(__name__)
@weblogin_required(admin=True)
@weblogin_required(role=consts.Roles.STAFF)
def download(request: 'HttpRequest', download_id: str) -> 'HttpResponse':
"""
Downloadables management

View File

@ -64,13 +64,13 @@ def index(request: HttpRequest) -> HttpResponse:
return response
# Includes a request.session ticket, indicating that
# Launches the service using a ticket (for example, from external portal)
@never_cache
def ticket_launcher(request: HttpRequest) -> HttpResponse:
return index(request)
# Basically, the original /login method, but fixed for modern interface
# Javascript configuration
@never_cache
def js(request: types.requests.ExtendedHttpRequest) -> HttpResponse:
return HttpResponse(content=configjs.uds_js(request), content_type='application/javascript')

View File

@ -60,7 +60,7 @@ if typing.TYPE_CHECKING:
logger = logging.getLogger(__name__)
@weblogin_required(admin=False)
@weblogin_required()
def transport_own_link(
request: 'ExtendedHttpRequestWithUser', service_id: str, transport_id: str
) -> HttpResponse:
@ -105,8 +105,7 @@ def transport_own_link(
return HttpResponse(content=json.dumps(response), content_type='application/json')
# pylint: disable=unused-argument
@weblogin_required(admin=False)
@weblogin_required()
@never_cache
def user_service_enabler(
request: 'ExtendedHttpRequestWithUser', service_id: str, transport_id: str
@ -126,7 +125,7 @@ def closer(request: 'ExtendedHttpRequest') -> HttpResponse:
# return HttpResponse('<html><body onload="window.close()"></body></html>')
@weblogin_required(admin=False)
@weblogin_required()
@never_cache
def user_service_status(
request: 'ExtendedHttpRequestWithUser', service_id: str, transport_id: str
@ -170,7 +169,7 @@ def user_service_status(
return HttpResponse(json.dumps({'status': status}), content_type='application/json')
@weblogin_required(admin=False)
@weblogin_required()
@never_cache
def action(request: 'ExtendedHttpRequestWithUser', service_id: str, action_string: str) -> HttpResponse:
userservice = UserServiceManager.manager().locate_meta_service(request.user, service_id)