mirror of
https://github.com/dkmstr/openuds.git
synced 2024-12-24 21:34:41 +03:00
Adding tests
This commit is contained in:
parent
0f587a4ec1
commit
91921537ce
@ -40,5 +40,5 @@ from .serializable import Serializable
|
||||
from .module import Module
|
||||
|
||||
|
||||
VERSION = '3.x.x-DEVEL'
|
||||
VERSION = '4.x.x-DEVEL'
|
||||
VERSION_STAMP = '{}-DEVEL'.format(time.strftime("%Y%m%d"))
|
||||
|
@ -56,7 +56,7 @@ from uds.core.util.state import State
|
||||
from uds.core.managers import cryptoManager
|
||||
from uds.core.auths import Authenticator as AuthenticatorInstance
|
||||
|
||||
from uds.models import User, Authenticator
|
||||
from uds import models
|
||||
|
||||
# Not imported at runtime, just for type checking
|
||||
if typing.TYPE_CHECKING:
|
||||
@ -76,7 +76,7 @@ RT = typing.TypeVar('RT')
|
||||
|
||||
|
||||
class AuthResult(typing.NamedTuple):
|
||||
user: typing.Optional[User] = None
|
||||
user: typing.Optional[models.User] = None
|
||||
url: typing.Optional[str] = None
|
||||
|
||||
|
||||
@ -102,14 +102,14 @@ def getUDSCookie(
|
||||
return cookie
|
||||
|
||||
|
||||
def getRootUser() -> User:
|
||||
def getRootUser() -> models.User:
|
||||
"""
|
||||
Returns an user not in DB that is ROOT for the platform
|
||||
|
||||
Returns:
|
||||
User: [description]
|
||||
"""
|
||||
user = User(
|
||||
user = models.User(
|
||||
id=ROOT_ID,
|
||||
name=GlobalConfig.SUPER_USER_LOGIN.get(True),
|
||||
real_name=_('System Administrator'),
|
||||
@ -207,7 +207,7 @@ def denyNonAuthenticated(
|
||||
|
||||
|
||||
def __registerUser(
|
||||
authenticator: Authenticator,
|
||||
authenticator: models.Authenticator,
|
||||
authInstance: AuthenticatorInstance,
|
||||
username: str,
|
||||
request: 'ExtendedHttpRequest',
|
||||
@ -245,7 +245,7 @@ def __registerUser(
|
||||
def authenticate(
|
||||
username: str,
|
||||
password: str,
|
||||
authenticator: Authenticator,
|
||||
authenticator: models.Authenticator,
|
||||
request: 'ExtendedHttpRequest',
|
||||
useInternalAuthenticate: bool = False,
|
||||
) -> AuthResult:
|
||||
@ -306,7 +306,7 @@ def authenticate(
|
||||
|
||||
|
||||
def authenticateViaCallback(
|
||||
authenticator: Authenticator,
|
||||
authenticator: models.Authenticator,
|
||||
params: typing.Any,
|
||||
request: 'ExtendedHttpRequestWithUser',
|
||||
) -> AuthResult:
|
||||
@ -356,14 +356,14 @@ def authenticateViaCallback(
|
||||
raise auths.exceptions.InvalidUserException('User doesn\'t has access to UDS')
|
||||
|
||||
|
||||
def authCallbackUrl(authenticator: Authenticator) -> str:
|
||||
def authCallbackUrl(authenticator: models.Authenticator) -> str:
|
||||
"""
|
||||
Helper method, so we can get the auth call back url for an authenticator
|
||||
"""
|
||||
return reverse('page.auth.callback', kwargs={'authName': authenticator.name})
|
||||
|
||||
|
||||
def authInfoUrl(authenticator: typing.Union[str, bytes, Authenticator]) -> str:
|
||||
def authInfoUrl(authenticator: typing.Union[str, bytes, models.Authenticator]) -> str:
|
||||
"""
|
||||
Helper method, so we can get the info url for an authenticator
|
||||
"""
|
||||
@ -380,7 +380,7 @@ def authInfoUrl(authenticator: typing.Union[str, bytes, Authenticator]) -> str:
|
||||
def webLogin(
|
||||
request: 'ExtendedHttpRequest',
|
||||
response: typing.Optional[HttpResponse],
|
||||
user: User,
|
||||
user: models.User,
|
||||
password: str,
|
||||
) -> bool:
|
||||
"""
|
||||
@ -444,7 +444,7 @@ def webLogout(
|
||||
by django in regular basis.
|
||||
"""
|
||||
if exit_url is None:
|
||||
exit_url = request.build_absolute_uri(reverse('page.logout'))
|
||||
exit_url = reverse('page.login')
|
||||
# exit_url = GlobalConfig.LOGIN_URL.get()
|
||||
# if GlobalConfig.REDIRECT_TO_HTTPS.getBool() is True:
|
||||
# exit_url = exit_url.replace('http://', 'https://')
|
||||
@ -476,7 +476,7 @@ def webLogout(
|
||||
|
||||
def authLogLogin(
|
||||
request: 'ExtendedHttpRequest',
|
||||
authenticator: Authenticator,
|
||||
authenticator: models.Authenticator,
|
||||
userName: str,
|
||||
logStr: str = '',
|
||||
) -> None:
|
||||
|
@ -103,9 +103,9 @@ class LogManager(metaclass=singleton.Singleton):
|
||||
qs = models.Log.objects.filter(owner_id=owner_id, owner_type=owner_type)
|
||||
# First, ensure we do not have more than requested logs, and we can put one more log item
|
||||
if qs.count() >= GlobalConfig.MAX_LOGS_PER_ELEMENT.getInt():
|
||||
for i in qs.order_by('-created',)[
|
||||
GlobalConfig.MAX_LOGS_PER_ELEMENT.getInt() - 1 : # type: ignore # Slicing is not supported by pylance right now
|
||||
]:
|
||||
for i in qs.order_by(
|
||||
'-created',
|
||||
)[GlobalConfig.MAX_LOGS_PER_ELEMENT.getInt() - 1 :]:
|
||||
i.delete()
|
||||
|
||||
if avoidDuplicates:
|
||||
@ -169,7 +169,10 @@ class LogManager(metaclass=singleton.Singleton):
|
||||
owner_type = transDict.get(type(wichObject), None)
|
||||
|
||||
if owner_type is not None:
|
||||
self.__log(owner_type, wichObject.id, level, message, source, avoidDuplicates) # type: ignore
|
||||
try:
|
||||
self.__log(owner_type, wichObject.id, level, message, source, avoidDuplicates) # type: ignore
|
||||
except Exception:
|
||||
logger.error('Error logging: %s:%s %s - %s %s', owner_type, wichObject.id, level, message, source) # type: ignore
|
||||
else:
|
||||
logger.debug(
|
||||
'Requested doLog for a type of object not covered: %s', wichObject
|
||||
|
@ -72,7 +72,7 @@ def _requestActor(
|
||||
'No notification urls for {}'.format(userService.friendly_name)
|
||||
)
|
||||
|
||||
minVersion = minVersion or '2.0.0'
|
||||
minVersion = minVersion or '3.5.0'
|
||||
version = userService.getProperty('actor_version') or '0.0.0'
|
||||
if '-' in version or version < minVersion:
|
||||
logger.warning(
|
||||
|
1
server/src/uds/test/web/client/__init__.py
Normal file
1
server/src/uds/test/web/client/__init__.py
Normal file
@ -0,0 +1 @@
|
||||
from .login import *
|
@ -30,6 +30,7 @@
|
||||
@author: Adolfo Gómez, dkmaster at dkmon dot com
|
||||
"""
|
||||
import logging
|
||||
import typing
|
||||
|
||||
from django.utils.translation import gettext_lazy as _
|
||||
from django import forms
|
||||
@ -69,4 +70,4 @@ class LoginForm(forms.Form):
|
||||
continue
|
||||
choices.append((a.uuid, a.name))
|
||||
|
||||
self.fields['authenticator'].choices = choices
|
||||
typing.cast(forms.ChoiceField, self.fields['authenticator']).choices = choices
|
||||
|
@ -37,7 +37,7 @@ from django.http import HttpRequest, HttpResponse, JsonResponse, HttpResponseRed
|
||||
from django.views.decorators.cache import never_cache
|
||||
from django.urls import reverse
|
||||
from uds.core.util.request import ExtendedHttpRequest, ExtendedHttpRequestWithUser
|
||||
from uds.core.auths import auth, exceptions
|
||||
from uds.core.auths import auth
|
||||
|
||||
from uds.web.util import errors
|
||||
from uds.web.forms.LoginForm import LoginForm
|
||||
@ -74,6 +74,7 @@ def login(
|
||||
) -> HttpResponse:
|
||||
# Default empty form
|
||||
logger.debug('Tag: %s', tag)
|
||||
response: typing.Optional[HttpResponse] = None
|
||||
if request.method == 'POST':
|
||||
request.session['restricted'] = False # Access is from login
|
||||
form = LoginForm(request.POST, tag=tag)
|
||||
@ -90,19 +91,18 @@ def login(
|
||||
if loginResult.url: # Redirection
|
||||
return HttpResponseRedirect(loginResult.url)
|
||||
|
||||
time.sleep(2) # On failure, wait a bit...
|
||||
if request.ip != '127.0.0.1':
|
||||
time.sleep(2) # On failure, wait a bit if not localhost
|
||||
# If error is numeric, redirect...
|
||||
if loginResult.errid:
|
||||
return errors.errorView(request, loginResult.errid)
|
||||
|
||||
# Error, set error on session for process for js
|
||||
request.session['errors'] = [loginResult.errstr]
|
||||
return index(request)
|
||||
else:
|
||||
request.session['tag'] = tag
|
||||
response = index(request)
|
||||
|
||||
return response
|
||||
return response or index(request)
|
||||
|
||||
|
||||
@never_cache
|
||||
|
Loading…
Reference in New Issue
Block a user