Ported to python >= 3.7 (needs already fixes, but most work done!! :)

This commit is contained in:
Adolfo Gómez García 2019-11-19 13:59:23 +01:00
parent 4a5669bdef
commit ede5d4b790
16 changed files with 152 additions and 198 deletions

View File

@ -61,6 +61,7 @@ USER_KEY = 'uk'
PASS_KEY = 'pk'
ROOT_ID = -20091204 # Any negative number will do the trick
RT = typing.TypeVar('RT')
def getUDSCookie(request: HttpRequest, response: typing.Optional[HttpResponse] = None, force: bool = False) -> str:
'''
@ -74,7 +75,7 @@ def getUDSCookie(request: HttpRequest, response: typing.Optional[HttpResponse] =
else:
cookie = request.COOKIES['uds']
if response is not None and force is True:
if response and force:
response.set_cookie('uds', cookie)
return cookie
@ -82,34 +83,33 @@ def getUDSCookie(request: HttpRequest, response: typing.Optional[HttpResponse] =
def getRootUser() -> User:
# pylint: disable=unexpected-keyword-arg, no-value-for-parameter
u = User(id=ROOT_ID, name=GlobalConfig.SUPER_USER_LOGIN.get(True), real_name=_(
'System Administrator'), state=State.ACTIVE, staff_member=True, is_admin=True)
u.manager = Authenticator()
user = User(
id=ROOT_ID,
name=GlobalConfig.SUPER_USER_LOGIN.get(True),
real_name=_('System Administrator'),
state=State.ACTIVE,
staff_member=True,
is_admin=True
)
user.manager = Authenticator()
# Fake overwrite some methods, a bit cheating? maybe? :)
u.getGroups = lambda: [] # type: ignore
u.updateLastAccess = lambda: None # type: ignore
u.logout = lambda: None # type: ignore
return u
@deprecated
def getIp(request):
logger.info('Deprecated IP')
return request.ip
user.getGroups = lambda: [] # type: ignore
user.updateLastAccess = lambda: None # type: ignore
user.logout = lambda: None # type: ignore
return user
# Decorator to make easier protect pages that needs to be logged in
def webLoginRequired(admin: typing.Union[bool, str] = False):
def webLoginRequired(admin: typing.Union[bool, str] = False) -> typing.Callable[[typing.Callable[..., RT]], typing.Callable[..., RT]]:
"""
Decorator to set protection to access page
Look for samples at uds.core.web.views
if admin == True, needs admin or staff
if admin == 'admin', needs admin
"""
def decorator(view_func: typing.Callable):
def decorator(view_func: typing.Callable[..., RT]) -> typing.Callable[..., RT]:
@wraps(view_func, assigned=available_attrs(view_func))
def _wrapped_view(request: HttpRequest, *args, **kwargs):
def _wrapped_view(request: HttpRequest, *args, **kwargs) -> RT:
"""
Wrapped function for decorator
"""
@ -132,14 +132,13 @@ def webLoginRequired(admin: typing.Union[bool, str] = False):
# Decorator to protect pages that needs to be accessed from "trusted sites"
def trustedSourceRequired(view_func: typing.Callable):
def trustedSourceRequired(view_func: typing.Callable[..., RT]) -> typing.Callable[..., RT]:
"""
Decorator to set protection to access page
look for sample at uds.dispatchers.pam
"""
@wraps(view_func)
def _wrapped_view(request: HttpRequest, *args, **kwargs):
def _wrapped_view(request: HttpRequest, *args, **kwargs) -> RT:
"""
Wrapped function for decorator
"""
@ -152,10 +151,10 @@ def trustedSourceRequired(view_func: typing.Callable):
# decorator to deny non authenticated requests
def denyNonAuthenticated(view_func: typing.Callable):
def denyNonAuthenticated(view_func: typing.Callable[..., RT]) -> typing.Callable[..., RT]:
@wraps(view_func)
def _wrapped_view(request: HttpRequest, *args, **kwargs):
def _wrapped_view(request: HttpRequest, *args, **kwargs) -> RT:
if request.user is None:
return HttpResponseForbidden()
return view_func(request, *args, **kwargs)
@ -251,17 +250,17 @@ def authenticateViaCallback(authenticator: Authenticator, params: typing.Any) ->
# If there is no callback for this authenticator...
if authInstance.authCallback == auths.Authenticator.authCallback:
raise auths.Exceptions.InvalidAuthenticatorException()
raise auths.exceptions.InvalidAuthenticatorException()
username = authInstance.authCallback(params, gm)
if username is None or username == '' or gm.hasValidGroups() is False:
raise auths.Exceptions.InvalidUserException('User doesn\'t has access to UDS')
raise auths.exceptions.InvalidUserException('User doesn\'t has access to UDS')
return __registerUser(authenticator, authInstance, username)
def authCallbackUrl(authenticator) -> str:
def authCallbackUrl(authenticator: Authenticator) -> str:
"""
Helper method, so we can get the auth call back url for an authenticator
"""
@ -354,9 +353,8 @@ def authLogLogin(request: HttpRequest, authenticator: Authenticator, userName: s
logStr = 'Logged in'
authLogger.info('|'.join([authenticator.name, userName, request.ip, request.os['OS'], logStr, request.META.get('HTTP_USER_AGENT', 'Undefined')]))
level = (logStr == 'Logged in') and log.INFO or log.ERROR
log.doLog(authenticator, level, 'user {0} has {1} from {2} where os is {3}'.format(userName, logStr,
request.ip, request.os['OS']), log.WEB)
level = log.INFO if logStr == 'Logged in' else log.ERROR
log.doLog(authenticator, level, 'user {} has {} from {} where os is {}'.format(userName, logStr, request.ip, request.os['OS']), log.WEB)
try:
user = authenticator.users.get(name=userName)
@ -365,6 +363,6 @@ def authLogLogin(request: HttpRequest, authenticator: Authenticator, userName: s
pass
def authLogLogout(request: HttpRequest):
log.doLog(request.user.manager, log.INFO, 'user {0} has logged out from {1}'.format(request.user.name, request.ip), log.WEB)
log.doLog(request.user, log.INFO, 'has logged out from {0}'.format(request.ip), log.WEB)
def authLogLogout(request: HttpRequest) -> None:
log.doLog(request.user.manager, log.INFO, 'user {} has logged out from {}'.format(request.user.name, request.ip), log.WEB)
log.doLog(request.user, log.INFO, 'has logged out from {}'.format(request.ip), log.WEB)

View File

@ -26,7 +26,6 @@
# CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY,
# OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE
# OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
"""
@author: Adolfo Gómez, dkmaster at dkmon dot com
"""
@ -42,12 +41,12 @@ logger = logging.getLogger(__name__)
class AssignedAndUnused(Job):
frecuency = 631
frecuency_cfg = GlobalConfig.CHECK_UNUSED_TIME
frecuency = 61 # Once every minute, but look for GlobalConfig.CHECK_UNUSED_TIME since
# frecuency_cfg = GlobalConfig.CHECK_UNUSED_TIME
friendly_name = 'Unused services checker'
def run(self):
since_state = getSqlDatetime() - timedelta(seconds=self.frecuency)
since_state = getSqlDatetime() - timedelta(seconds=GlobalConfig.CHECK_UNUSED_TIME)
for ds in ServicePool.objects.all():
# Skips checking deployed services in maintenance mode or ignores assigned and unused
if ds.isInMaintenance() is True or ds.ignores_unused:

View File

@ -162,8 +162,8 @@ class Authenticator(ManagedObjectModel, TaggingMixin):
This is done so we can check non existing or non blocked users (state != Active, or do not exists)
"""
try:
u: User = self.users.get(name=username)
return State.isActive(u.state)
usr: 'User' = self.users.get(name=username)
return State.isActive(usr.state)
except Exception:
return falseIfNotExists

View File

@ -1,6 +1,6 @@
# -*- coding: utf-8 -*-
#
# Copyright (c) 2012 Virtual Cable S.L.
# Copyright (c) 2012-2019 Virtual Cable S.L.
# All rights reserved.
#
# Redistribution and use in source and binary forms, with or without modification,

View File

@ -1,7 +1,7 @@
# -*- coding: utf-8 -*-
#
# Copyright (c) 2012 Virtual Cable S.L.
# Copyright (c) 2012-2019 Virtual Cable S.L.
# All rights reserved.
#
# Redistribution and use in source and binary forms, with or without modification,

View File

@ -1,7 +1,6 @@
# -*- coding: utf-8 -*-
#
# Copyright (c) 2012 Virtual Cable S.L.
# Copyright (c) 2012-2019 Virtual Cable S.L.
# All rights reserved.
#
# Redistribution and use in source and binary forms, with or without modification,
@ -30,44 +29,19 @@
"""
@author: Adolfo Gómez, dkmaster at dkmon dot com
"""
from __future__ import unicode_literals
from django.utils.translation import ugettext_lazy as _, ugettext
from django import forms
from django.utils.safestring import mark_safe
from uds.models import Authenticator
import six
import logging
from django.utils.translation import ugettext_lazy as _
from django import forms
from uds.models import Authenticator
logger = logging.getLogger(__name__)
# pylint: disable=no-value-for-parameter, unexpected-keyword-arg
class CustomSelect(forms.Select):
bootstrap = False
def render(self, name, value, attrs=None, **kwargs):
if len(self.choices) < 2:
visible = ' style="display: none;"'
else:
visible = ''
res = '<select id="id_{0}" name="{0}" class="selectpicker show-menu-arrow" data-header="{1}" data-size="8" data-width="100%" >'.format(name, ugettext('Select authenticator'))
for choice in self.choices:
res += '<option value="{0}">{1}</option>'.format(choice[0], choice[1])
res += '</select>'
return mark_safe('<div class="form-group"{0}><label>'.format(visible) + six.text_type(_('authenticator')) + '</label>' + res + '</div>')
class LoginForm(forms.Form):
user = forms.CharField(label=_('Username'), max_length=64, widget=forms.TextInput())
password = forms.CharField(label=_('Password'), widget=forms.PasswordInput(attrs={'title': _('Password')}), required=False)
authenticator = forms.ChoiceField(label=_('Authenticator'), choices=(), widget=CustomSelect(), required=False)
standard = forms.CharField(widget=forms.HiddenInput(), required=False)
nonStandard = forms.CharField(widget=forms.HiddenInput(), required=False)
logouturl = forms.CharField(widget=forms.HiddenInput(), required=False)
authenticator = forms.ChoiceField(label=_('Authenticator'), choices=(), required=False)
def __init__(self, *args, **kwargs):
# If an specified login is passed in, retrieve it & remove it from kwargs dict
@ -75,26 +49,16 @@ class LoginForm(forms.Form):
if 'tag' in kwargs:
del kwargs['tag']
logger.debug('tag is "{0}"'.format(tag))
# Parent init
super(LoginForm, self).__init__(*args, **kwargs)
choices = []
nonStandard = []
standard = []
auths = Authenticator.getByTag(tag)
for a in auths:
if a.getType() is None:
for a in Authenticator.getByTag(tag):
if not a.getType(): # Not existing manager for the auth?
continue
if a.getType().isCustom() and tag == 'disabled':
continue
choices.append((a.uuid, a.name))
if a.getType().isCustom():
nonStandard.append(a.uuid)
else:
standard.append(a.uuid)
self.fields['authenticator'].choices = choices
self.fields['nonStandard'].initial = ','.join(nonStandard)
self.fields['standard'].initial = ','.join(standard)

View File

@ -1,6 +1,6 @@
# -*- coding: utf-8 -*-
#
# Copyright (c) 2018 Virtual Cable S.L.
# Copyright (c) 2018-2019 Virtual Cable S.L.
# All rights reserved.
#
# Redistribution and use in source and binary forms, with or without modification,
@ -52,7 +52,7 @@ logger = logging.getLogger(__name__)
# Returns:
# (None, ErroString) if error
# (None, NumericError) if errorview redirection
# (User, password_string) if all if fine
# (User, password_string) if all is ok
def checkLogin( # pylint: disable=too-many-branches, too-many-statements
request: 'HttpRequest',
form: 'LoginForm',

View File

@ -1,7 +1,6 @@
# -*- coding: utf-8 -*-
#
# Copyright (c) 2018 Virtual Cable S.L.
# Copyright (c) 2018-2019 Virtual Cable S.L.
# All rights reserved.
#
# Redistribution and use in source and binary forms, with or without modification,
@ -31,8 +30,8 @@
.. moduleauthor:: Adolfo Gómez, dkmaster at dkmon dot com
"""
import json
import logging
import typing
from django import template
from django.conf import settings
@ -43,14 +42,15 @@ from django.templatetags.static import static
from uds.REST import AUTH_TOKEN_HEADER
from uds.REST.methods.client import CLIENT_VERSION
from uds.core.managers import downloadsManager
from uds.core.util.config import GlobalConfig
from uds.core import VERSION, VERSION_STAMP
from uds.models import Authenticator, Image
# Not imported at runtime, just for type checking
if typing.TYPE_CHECKING:
from django.http import HttpRequest # pylint: disable=ungrouped-imports
logger = logging.getLogger(__name__)
register = template.Library()
@ -59,11 +59,11 @@ CSRF_FIELD = 'csrfmiddlewaretoken'
@register.simple_tag(takes_context=True)
def udsJs(request):
def udsJs(request: 'HttpRequest') -> str:
auth_host = request.META.get('HTTP_HOST') or request.META.get('SERVER_NAME') or 'auth_host' # Last one is a placeholder in case we can't locate host name
profile = {
'user': None if request.user is None else request.user.name,
'user': None if not request.user else request.user.name,
'role': 'staff' if request.user and request.user.staff_member else 'user',
}
@ -84,7 +84,7 @@ def udsJs(request):
authenticators = Authenticator.objects.all()
# the auths for client
def getAuth(auth):
def getAuthInfo(auth: Authenticator):
theType = auth.getType()
return {
'id': auth.uuid,
@ -99,7 +99,7 @@ def udsJs(request):
'version_stamp': VERSION_STAMP,
'language': get_language(),
'available_languages': [{'id': k, 'name': gettext(v)} for k, v in settings.LANGUAGES],
'authenticators': [getAuth(auth) for auth in authenticators if auth.getType()],
'authenticators': [getAuthInfo(auth) for auth in authenticators if auth.getType()],
'os': request.os['OS'],
'csrf_field': CSRF_FIELD,
'csrf': csrf_token,
@ -142,7 +142,7 @@ def udsJs(request):
)
]
actors = []
actors: typing.List[typing.Dict[str, str]] = []
if profile['role'] == 'staff': # Add staff things
# If is admin (informational, REST api checks users privileges anyway...)
@ -156,7 +156,7 @@ def udsJs(request):
config['urls']['admin'] = reverse('uds.admin.views.index')
config['urls']['rest'] = reverse('REST', kwargs={'arguments': ''})
errors = []
errors: typing.List = []
if 'errors' in request.session:
errors = request.session['errors']
del request.session['errors']

View File

@ -1,7 +1,6 @@
# -*- coding: utf-8 -*-
#
# Copyright (c) 2012 Virtual Cable S.L.
# Copyright (c) 2012-2019 Virtual Cable S.L.
# All rights reserved.
#
# Redistribution and use in source and binary forms, with or without modification,
@ -45,7 +44,7 @@ from uds.models import ServicePool, Transport, UserService, Authenticator
# Not imported at runtime, just for type checking
if typing.TYPE_CHECKING:
from django.http import HttpRequest # pylint: disable=ungrouped-imports
from django.http import HttpRequest, HttpResponse # pylint: disable=ungrouped-imports
logger = logging.getLogger(__name__)
@ -93,14 +92,14 @@ strings = [
]
def errorString(errorId) -> str:
def errorString(errorId: int) -> str:
errorId = int(errorId)
if errorId < len(strings):
return strings[errorId]
return strings[0]
def errorView(request: 'HttpRequest', errorCode: int) -> None:
def errorView(request: 'HttpRequest', errorCode: int) -> HttpResponseRedirect:
errorCode = int(errorCode)
code = (errorCode >> 8) & 0xFF
errorCode = errorCode & 0xFF
@ -115,7 +114,7 @@ def errorView(request: 'HttpRequest', errorCode: int) -> None:
return HttpResponseRedirect(reverse('page.error', kwargs={'error': errStr}))
def exceptionView(request, exception):
def exceptionView(request: 'HttpRequest', exception: Exception) -> HttpResponseRedirect:
"""
Tries to render an error page with error information
"""
@ -125,7 +124,7 @@ def exceptionView(request, exception):
logger.debug(traceback.format_exc())
try:
raise exception
raise exception # Raise it so we can "catch" and redirect
except UserService.DoesNotExist:
return errorView(request, ERR_USER_SERVICE_NOT_FOUND)
except ServicePool.DoesNotExist:
@ -153,11 +152,9 @@ def exceptionView(request, exception):
# raise e
def error(request, error):
def error(request: 'HttpRequest', err: str) -> 'HttpResponse':
"""
Error view, responsible of error display
:param request:
:param idError:
"""
return render(request, 'uds/modern/index.html', {})

View File

@ -1,6 +1,6 @@
# -*- coding: utf-8 -*-
#
# Copyright (c) 2012 Virtual Cable S.L.
# Copyright (c) 2012-2019 Virtual Cable S.L.
# All rights reserved.
#
# Redistribution and use in source and binary forms, with or without modification,
@ -206,7 +206,7 @@ def getServicesData(request: 'HttpRequest') -> typing.Dict[str, typing.Any]: #
'to_be_replaced_text': tbrt,
})
logger.debug('Services: {0}'.format(services))
logger.debug('Services: %s', services)
# Sort services and remove services with no transports...
services = [s for s in sorted(services, key=lambda s: s['name'].upper()) if s['transports']]

View File

@ -1,6 +1,6 @@
# -*- coding: utf-8 -*-
#
# Copyright (c) 2012 Virtual Cable S.L.
# Copyright (c) 2012-2019 Virtual Cable S.L.
# All rights reserved.
#
# Redistribution and use in source and binary forms, with or without modification,
@ -29,15 +29,16 @@
@author: Adolfo Gómez, dkmaster at dkmon dot com
"""
import logging
import typing
from django.urls import reverse
from django.http import HttpResponse, HttpResponseRedirect, HttpResponsePermanentRedirect
from django.http import HttpRequest, HttpResponse, HttpResponseRedirect, HttpResponsePermanentRedirect
from django.utils.translation import ugettext as _
from django.views.decorators.cache import never_cache
from django.views.decorators.csrf import csrf_exempt
import uds.web.util.errors as errors
from uds.core.auths.exceptions import InvalidUserException
from uds.core import auths
from uds.core.auths.auth import webLogin, webLogout, authenticateViaCallback, authLogLogin, getUDSCookie
from uds.core.managers import userServiceManager, cryptoManager
from uds.core.services.exceptions import ServiceNotReadyError
@ -50,11 +51,9 @@ from uds.models import TicketStore
logger = logging.getLogger(__name__)
__updated__ = '2019-02-08'
@csrf_exempt
def authCallback(request, authName):
def authCallback(request: HttpRequest, authName: str) -> HttpResponse:
"""
This url is provided so external SSO authenticators can get an url for
redirecting back the users.
@ -62,7 +61,6 @@ def authCallback(request, authName):
This will invoke authCallback of the requested idAuth and, if this represents
an authenticator that has an authCallback
"""
from uds.core import auths
try:
authenticator = Authenticator.objects.get(name=authName)
params = request.GET.copy()
@ -73,7 +71,7 @@ def authCallback(request, authName):
# params['_session'] = request.session
# params['_user'] = request.user
logger.debug('Auth callback for {0} with params {1}'.format(authenticator, params.keys()))
logger.debug('Auth callback for %s with params %s', authenticator, params.keys())
user = authenticateViaCallback(authenticator, params)
@ -81,7 +79,7 @@ def authCallback(request, authName):
if user is None:
authLogLogin(request, authenticator, '{0}'.format(params), 'Invalid at auth callback')
raise auths.Exceptions.InvalidUserException()
raise auths.exceptions.InvalidUserException()
response = HttpResponseRedirect(reverse('Index'))
@ -91,9 +89,9 @@ def authCallback(request, authName):
# It will only detect java, and them redirect to Java
return response
except auths.Exceptions.Redirect as e:
except auths.exceptions.Redirect as e:
return HttpResponseRedirect(request.build_absolute_uri(str(e)))
except auths.Exceptions.Logout as e:
except auths.exceptions.Logout as e:
return webLogout(request, request.build_absolute_uri(str(e)))
except Exception as e:
logger.exception('authCallback')
@ -104,14 +102,13 @@ def authCallback(request, authName):
@csrf_exempt
def authInfo(request, authName):
def authInfo(request: 'HttpRequest', authName: str) -> HttpResponse:
"""
This url is provided so authenticators can provide info (such as SAML metadata)
This will invoke getInfo on requested authName. The search of the authenticator is done
by name, so it's easier to access from external sources
"""
from uds.core import auths
try:
logger.debug('Getting info for %s', authName)
authenticator = Authenticator.objects.get(name=authName)
@ -135,7 +132,7 @@ def authInfo(request, authName):
# Gets the javascript from the custom authtenticator
@never_cache
def customAuth(request, idAuth):
def customAuth(request: 'HttpRequest', idAuth: str) -> HttpResponse:
res = ''
try:
try:
@ -143,7 +140,7 @@ def customAuth(request, idAuth):
except Authenticator.DoesNotExist:
auth = Authenticator.objects.get(pk=idAuth)
res = auth.getInstance().getJavascript(request)
if res is None:
if not res:
res = ''
except Exception:
logger.exception('customAuth')
@ -152,7 +149,7 @@ def customAuth(request, idAuth):
@never_cache
def ticketAuth(request, ticketId):
def ticketAuth(request: 'HttpRequest', ticketId: str) -> HttpResponse: # pylint: disable=too-many-locals,too-many-branches,too-many-statements
"""
Used to authenticate an user via a ticket
"""
@ -170,25 +167,25 @@ def ticketAuth(request, ticketId):
transport = data['transport']
except Exception:
logger.error('Ticket stored is not valid')
raise InvalidUserException()
raise auths.exceptions.InvalidUserException()
auth = Authenticator.objects.get(uuid=auth)
# If user does not exists in DB, create it right now
# Add user to groups, if they exists...
grps = []
grps: typing.List = []
for g in groups:
try:
grps.append(auth.groups.get(uuid=g))
except Exception:
logger.debug('Group list has changed since ticket assignment')
if len(grps) == 0:
if not grps:
logger.error('Ticket has no valid groups')
raise Exception('Invalid ticket authentication')
usr = auth.getOrCreateUser(username, realname)
if usr is None or State.isActive(usr.state) is False: # If user is inactive, raise an exception
raise InvalidUserException()
raise auths.exceptions.InvalidUserException()
# Add groups to user (replace existing groups)
usr.groups.set(grps)
@ -199,15 +196,15 @@ def ticketAuth(request, ticketId):
request.user = usr # Temporarily store this user as "authenticated" user, next requests will be done using session
request.session['ticket'] = '1' # Store that user access is done using ticket
logger.debug("Service & transport: {}, {}".format(servicePool, transport))
logger.debug("Service & transport: %s, %s", servicePool, transport)
for v in ServicePool.objects.all():
logger.debug("{} {}".format(v.uuid, v.name))
logger.debug("%s %s", v.uuid, v.name)
# Check if servicePool is part of the ticket
if servicePool is not None:
if servicePool:
# If service pool is in there, also is transport
res = userServiceManager().getService(request.user, request.os, request.ip, 'F' + servicePool, transport, False)
_x, userService, _x, transport, _x = res
_, userService, _, transport, _ = res
transportInstance = transport.getInstance()
if transportInstance.ownLink is True:
@ -215,7 +212,7 @@ def ticketAuth(request, ticketId):
else:
link = html.udsAccessLink(request, 'A' + userService.uuid, transport.uuid)
request.session['launch'] = link;
request.session['launch'] = link
response = HttpResponseRedirect(reverse('page.ticket.launcher'))
else:
response = HttpResponsePermanentRedirect(reverse('page.index'))

View File

@ -1,6 +1,6 @@
# -*- coding: utf-8 -*-
#
# Copyright (c) 2018 Virtual Cable S.L.
# Copyright (c) 2018-2019 Virtual Cable S.L.
# All rights reserved.
#
# Redistribution and use in source and binary forms, with or without modification,

View File

@ -29,17 +29,21 @@
@author: Adolfo Gómez, dkmaster at dkmon dot com
"""
import logging
import typing
from uds.core.auths.auth import webLoginRequired
from uds.core.managers import downloadsManager
from .modern import index
# Not imported at runtime, just for type checking
if typing.TYPE_CHECKING:
from django.http import HttpRequest, HttpResponse # pylint: disable=ungrouped-imports
logger = logging.getLogger(__name__)
@webLoginRequired(admin=True)
def download(request, idDownload):
def download(request: 'HttpRequest', idDownload: str) -> 'HttpResponse':
"""
Downloadables management
"""

View File

@ -1,6 +1,6 @@
# -*- coding: utf-8 -*-
#
# Copyright (c) 2012 Virtual Cable S.L.
# Copyright (c) 2012-2019 Virtual Cable S.L.
# All rights reserved.
#
# Redistribution and use in source and binary forms, with or without modification,
@ -28,7 +28,8 @@
"""
@author: Adolfo Gómez, dkmaster at dkmon dot com
"""
from __future__ import unicode_literals
import logging
import typing
from django.http import HttpResponse
from django.views.decorators.cache import cache_page
@ -37,16 +38,15 @@ from uds.core.ui.images import DEFAULT_IMAGE
from uds.core.util.model import processUuid
from uds.models import Image
import logging
logger = logging.getLogger(__name__)
__updated__ = '2016-02-15'
# Not imported at runtime, just for type checking
if typing.TYPE_CHECKING:
from django.http import HttpRequest # pylint: disable=ungrouped-imports
@cache_page(3600, key_prefix='img', cache='memory')
def image(request, idImage):
def image(request: 'HttpRequest', idImage: str) -> 'HttpResponse':
try:
icon = Image.objects.get(uuid=processUuid(idImage))
return icon.imageResponse()

View File

@ -29,55 +29,49 @@
@author: Adolfo Gómez, dkmaster at dkmon dot com
"""
import logging
import typing
from django.shortcuts import render
from django.http import HttpResponse, JsonResponse
from django.http import HttpRequest, HttpResponse, JsonResponse, HttpResponseRedirect
from django.urls import reverse
from uds.web.util.errors import errorView
from uds.core.auths.auth import (
getUDSCookie,
denyNonAuthenticated,
webLoginRequired,
authLogLogout,
webLogout,
)
from uds.web.util import errors
from uds.core.auths import auth
from uds.web.forms.LoginForm import LoginForm
from uds.web.util.authentication import checkLogin
from uds.web.util.services import getServicesData
from uds.web.util import configjs
logger = logging.getLogger(__name__)
def index(request):
def index(request: HttpRequest) -> HttpResponse:
# return errorView(request, 1)
response = render(request, 'uds/modern/index.html', {})
logger.debug('Session expires at %s', request.session.get_expiry_date())
# Ensure UDS cookie is present
getUDSCookie(request, response)
auth.getUDSCookie(request, response)
return response
# Basically, the original /login method, but fixed for modern interface
def login(request, tag=None):
from uds.web.forms.LoginForm import LoginForm
from uds.web.util.authentication import checkLogin
from uds.core.auths.auth import webLogin
from django.http import HttpResponseRedirect
def login(request: HttpRequest, tag: typing.Optional[str] = None) -> HttpResponse:
# Default empty form
if request.method == 'POST':
form = LoginForm(request.POST, tag=tag)
user, data = checkLogin(request, form, tag)
if user:
response = HttpResponseRedirect(reverse('page.index'))
webLogin(request, response, user, data) # data is user password here
auth.webLogin(request, response, user, data) # data is user password here
else:
# If error is numeric, redirect...
# Error, set error on session for process for js
if isinstance(data, int):
return errorView(request, data)
return errors.errorView(request, data)
request.session['errors'] = [data]
return index(request)
@ -87,19 +81,19 @@ def login(request, tag=None):
return response
@webLoginRequired(admin=False)
def logout(request):
authLogLogout(request)
@auth.webLoginRequired(admin=False)
def logout(request: HttpRequest) -> HttpResponse:
auth.authLogLogout(request)
logoutUrl = request.user.logout()
if logoutUrl is None:
logoutUrl = request.session.get('logouturl', None)
return webLogout(request, logoutUrl)
return auth.webLogout(request, logoutUrl)
def js(request):
def js(request: HttpRequest) -> HttpResponse:
return HttpResponse(content=configjs.udsJs(request), content_type='application/javascript')
@denyNonAuthenticated
def servicesData(request):
@auth.denyNonAuthenticated
def servicesData(request: HttpRequest) -> HttpResponse:
return JsonResponse(getServicesData(request))

View File

@ -1,6 +1,6 @@
# -*- coding: utf-8 -*-
#
# Copyright (c) 2012 Virtual Cable S.L.
# Copyright (c) 2012-2019 Virtual Cable S.L.
# All rights reserved.
#
# Redistribution and use in source and binary forms, with or without modification,
@ -28,7 +28,9 @@
"""
@author: Adolfo Gómez, dkmaster at dkmon dot com
"""
from __future__ import unicode_literals
import json
import logging
import typing
from django.utils.translation import ugettext as _
from django.http import HttpResponse
@ -44,23 +46,23 @@ from uds.core.util import html, log
from uds.core.services.exceptions import ServiceNotReadyError, MaxServicesReachedError, ServiceAccessDeniedByCalendar
from uds.web.util import errors
from uds.web.util import services
from uds.web.util import services
import json
import logging
# Not imported at runtime, just for type checking
if typing.TYPE_CHECKING:
from django.http import HttpRequest # pylint: disable=ungrouped-imports
logger = logging.getLogger(__name__)
__updated__ = '2019-02-08'
@webLoginRequired(admin=False)
def transportOwnLink(request, idService, idTransport):
def transportOwnLink(request: 'HttpRequest', idService: str, idTransport: str):
try:
res = userServiceManager().getService(request.user, request.os, request.ip, idService, idTransport)
ip, userService, iads, trans, itrans = res # @UnusedVariable
ip, userService, iads, trans, itrans = res # pylint: disable=unused-variable
# This returns a response object in fact
return itrans.getLink(userService, trans, ip, request.os, request.user, webPassword(request), request)
if itrans and ip:
return itrans.getLink(userService, trans, ip, request.os, request.user, webPassword(request), request)
except ServiceNotReadyError as e:
return errors.exceptionView(request, e)
except Exception as e:
@ -68,11 +70,11 @@ def transportOwnLink(request, idService, idTransport):
return errors.exceptionView(request, e)
# Will never reach this
raise RuntimeError('Unreachable point reached!!!')
return errors.errorView(request, errors.UNKNOWN_ERROR)
@cache_page(3600, key_prefix='img', cache='memory')
def transportIcon(request, idTrans):
def transportIcon(request: 'HttpRequest', idTrans: str) -> HttpResponse:
try:
transport: Transport = Transport.objects.get(uuid=processUuid(idTrans))
return HttpResponse(transport.getInstance().icon(), content_type='image/png')
@ -81,7 +83,7 @@ def transportIcon(request, idTrans):
@cache_page(3600, key_prefix='img', cache='memory')
def serviceImage(request, idImage):
def serviceImage(request: 'HttpRequest', idImage: str) -> HttpResponse:
try:
icon = Image.objects.get(uuid=processUuid(idImage))
return icon.imageResponse()
@ -97,7 +99,7 @@ def serviceImage(request, idImage):
@webLoginRequired(admin=False)
@never_cache
def userServiceEnabler(request, idService, idTransport):
def userServiceEnabler(request: 'HttpRequest', idService: str, idTransport: str) -> HttpResponse:
# Maybe we could even protect this even more by limiting referer to own server /? (just a meditation..)
logger.debug('idService: %s, idTransport: %s', idService, idTransport)
url = ''
@ -110,7 +112,7 @@ def userServiceEnabler(request, idService, idTransport):
scrambler = cryptoManager().randomString(32)
password = cryptoManager().symCrypt(webPassword(request), scrambler)
_x, userService, _x, trans, _x = res
userService, trans = res[1], res[3]
data = {
'service': 'A' + userService.uuid,
@ -127,10 +129,10 @@ def userServiceEnabler(request, idService, idTransport):
# Not ready, show message and return to this page in a while
error += ' (code {0:04X})'.format(e.code)
except MaxServicesReachedError:
logger.info('Number of service reached MAX for service pool "{}"'.format(idService))
logger.info('Number of service reached MAX for service pool "%s"', idService)
error = errors.errorString(errors.MAX_SERVICES_REACHED)
except ServiceAccessDeniedByCalendar:
logger.info('Access tried to a calendar limited access pool "{}"'.format(idService))
logger.info('Access tried to a calendar limited access pool "%s"', idService)
error = errors.errorString(errors.SERVICE_CALENDAR_DENIED)
except Exception as e:
logger.exception('Error')
@ -144,17 +146,17 @@ def userServiceEnabler(request, idService, idTransport):
content_type='application/json'
)
def closer(request):
def closer(request: 'HttpRequest') -> HttpResponse:
return HttpResponse('<html><body onload="window.close()"></body></html>')
@webLoginRequired(admin=False)
@never_cache
def action(request, idService, action):
def action(request: 'HttpRequest', idService: str, actionString: str) -> HttpResponse:
userService = userServiceManager().locateUserService(request.user, idService, create=False)
response = None
rebuild = False
response: typing.Any = None
rebuild: bool = False
if userService:
if action == 'release' and userService.deployed_service.allow_users_remove:
if actionString == 'release' and userService.deployed_service.allow_users_remove:
rebuild = True
log.doLog(
userService.deployed_service,
@ -164,7 +166,7 @@ def action(request, idService, action):
)
userServiceManager().requestLogoff(userService)
userService.release()
elif (action == 'reset'
elif (actionString == 'reset'
and userService.deployed_service.allow_users_reset
and userService.deployed_service.service.getType().canReset):
rebuild = True
@ -180,7 +182,6 @@ def action(request, idService, action):
if rebuild:
# Rebuild services data, but return only "this" service
for v in services.getServicesData(request)['services']:
logger.debug('{} ==? {}'.format(v['id'], idService))
if v['id'] == idService:
response = v
break