diff --git a/server/src/uds/core/auths/auth.py b/server/src/uds/core/auths/auth.py index 8b42e4a7..2eea7f8e 100644 --- a/server/src/uds/core/auths/auth.py +++ b/server/src/uds/core/auths/auth.py @@ -123,15 +123,24 @@ def getRootUser() -> User: # Decorator to make easier protect pages that needs to be logged in def webLoginRequired( - admin: typing.Union[bool, str] = False + admin: typing.Union[bool, typing.Literal['admin']] = False ) -> typing.Callable[ [typing.Callable[..., HttpResponse]], typing.Callable[..., HttpResponse] ]: - """ - Decorator to set protection to access page + """Decorator to set protection to access page Look for samples at uds.core.web.views if admin == True, needs admin or staff if admin == 'admin', needs admin + + Args: + admin (bool, optional): If True, needs admin or staff. Is it's "admin" literal, needs admin . Defaults to False (any user). + + Returns: + typing.Callable[[typing.Callable[..., HttpResponse]], typing.Callable[..., HttpResponse]]: Decorator + + Note: + This decorator is used to protect pages that needs to be logged in. + To protect against ajax calls, use `denyNonAuthenticated` instead """ def decorator( @@ -148,7 +157,7 @@ def webLoginRequired( if not request.user or not request.authorized: return HttpResponseRedirect(reverse('page.login')) - if admin is True or admin == 'admin': # bool or string "admin" + if admin in (True, 'admin'): if request.user.isStaff() is False or ( admin == 'admin' and not request.user.is_admin ): @@ -172,7 +181,6 @@ def trustedSourceRequired( ) -> typing.Callable[..., RT]: """ Decorator to set protection to access page - look for sample at uds.dispatchers.pam """ @wraps(view_func) @@ -194,6 +202,8 @@ def trustedSourceRequired( # decorator to deny non authenticated requests +# The difference with webLoginRequired is that this one does not redirect to login page +# it's designed to be used in ajax calls mainly def denyNonAuthenticated( view_func: typing.Callable[..., RT] ) -> typing.Callable[..., RT]: diff --git a/server/src/uds/models/ticket_store.py b/server/src/uds/models/ticket_store.py index 22dc3325..0b533522 100644 --- a/server/src/uds/models/ticket_store.py +++ b/server/src/uds/models/ticket_store.py @@ -173,6 +173,7 @@ class TicketStore(UUIDModel): uuid: str, secure: bool = False, owner: typing.Optional[str] = None, + checkFnc: typing.Callable[[typing.Any], bool] = lambda x: True, **kwargs: typing.Any, ) -> None: try: @@ -189,6 +190,10 @@ class TicketStore(UUIDModel): dct = pickle.loads(data) + # invoke check function + if checkFnc(dct) is False: + raise TicketStore.InvalidTicket('Validation failed') + for k, v in kwargs.items(): if v is not None: dct[k] = v diff --git a/server/src/uds/models/user_service.py b/server/src/uds/models/user_service.py index ebcf9f53..9e44ba03 100644 --- a/server/src/uds/models/user_service.py +++ b/server/src/uds/models/user_service.py @@ -71,10 +71,10 @@ class UserService(UUIDModel): # pylint: disable=too-many-public-methods # The reference to deployed service is used to accelerate the queries for different methods, in fact its redundant cause we can access to the deployed service # through publication, but queries are much more simple - deployed_service: 'models.ForeignKey["UserService", ServicePool]' = models.ForeignKey( + deployed_service: 'models.ForeignKey[ServicePool]' = models.ForeignKey( ServicePool, on_delete=models.CASCADE, related_name='userServices' ) - publication: 'models.ForeignKey["UserService", ServicePoolPublication]' = ( + publication: 'models.ForeignKey[ServicePoolPublication|None]' = ( models.ForeignKey( ServicePoolPublication, on_delete=models.CASCADE, diff --git a/server/src/uds/transports/HTML5RDP/html5rdp.py b/server/src/uds/transports/HTML5RDP/html5rdp.py index c39f16c4..3f9ae307 100644 --- a/server/src/uds/transports/HTML5RDP/html5rdp.py +++ b/server/src/uds/transports/HTML5RDP/html5rdp.py @@ -35,14 +35,10 @@ import logging import typing from urllib.parse import urlencode -from uds.models.util import getSqlDatetime - from django.utils.translation import ugettext_noop as _ from uds.core.ui import gui - from uds.core import transports - from uds.core.util import os_detector as OsDetector from uds.core.managers import cryptoManager from uds import models @@ -457,7 +453,7 @@ class HTML5RDPTransport(transports.Transport): 'create-drive-path': 'true', 'ticket-info': { 'userService': userService.uuid, - 'user': userService.user.uuid if userService.user else '', + 'user': user.uuid, }, } @@ -474,7 +470,7 @@ class HTML5RDPTransport(transports.Transport): + '_' + sanitize(user.name) + '/' - + getSqlDatetime().strftime('%Y%m%d-%H%M') + + models.getSqlDatetime().strftime('%Y%m%d-%H%M') ) params['create-recording-path'] = 'true' diff --git a/server/src/uds/web/views/modern.py b/server/src/uds/web/views/modern.py index 40364d29..2f871ab6 100644 --- a/server/src/uds/web/views/modern.py +++ b/server/src/uds/web/views/modern.py @@ -308,7 +308,7 @@ def update_transport_ticket(request: ExtendedHttpRequestWithUser, idTicket: str, if request.method == 'POST': # Get request body as json data = json.loads(request.body) - + # Update username andd password in ticket username = data.get('username', None) or None # None if not present password = data.get('password', None) or None # If password is empty, set it to None @@ -317,8 +317,21 @@ def update_transport_ticket(request: ExtendedHttpRequestWithUser, idTicket: str, if password: password = cryptoManager().symCrypt(password, scrambler) + def checkValidTicket(data: typing.Mapping[str, typing.Any]) -> bool: + if 'ticket-info' not in data: + return True + try: + user = models.User.objects.get(uuid=data['ticket-info'].get('user', None)) + if request.user == user: + return True + except models.User.DoesNotExist: + pass + return False + + models.TicketStore.update( uuid=idTicket, + checkFnc=checkValidTicket, username=username, password=password, domain=domain, diff --git a/server/src/uds/web/views/service.py b/server/src/uds/web/views/service.py index 3b83f039..b355dea3 100644 --- a/server/src/uds/web/views/service.py +++ b/server/src/uds/web/views/service.py @@ -62,12 +62,14 @@ def transportOwnLink( ): response: typing.MutableMapping[str, typing.Any] = {} + # If userService is not owned by user, will raise an exception + # For type checkers to "be happy" try: res = userServiceManager().getService( request.user, request.os, request.ip, idService, idTransport ) - ip, userService, iads, trans, itrans = res # pylint: disable=unused-variable + ip, userService, iads, trans, itrans = res # This returns a response object in fact if itrans and ip: response = {