From 6ce5cbe10e05ff3cd854a49a9153218e869a315a Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Adolfo=20G=C3=B3mez=20Garc=C3=ADa?= Date: Thu, 19 Sep 2024 19:32:42 +0200 Subject: [PATCH] Some improvements to Authentication limitation by network (Extended to REST api and whatever uses authenticator) --- server/src/uds/REST/methods/login_logout.py | 4 +- server/src/uds/core/auths/auth.py | 88 ++++++++++----------- server/src/uds/core/types/auth.py | 2 +- server/src/uds/urls.py | 2 +- server/src/uds/web/util/authentication.py | 10 --- 5 files changed, 48 insertions(+), 58 deletions(-) diff --git a/server/src/uds/REST/methods/login_logout.py b/server/src/uds/REST/methods/login_logout.py index fb4e42473..097b8d863 100644 --- a/server/src/uds/REST/methods/login_logout.py +++ b/server/src/uds/REST/methods/login_logout.py @@ -179,7 +179,7 @@ class Login(Handler): # And store in cache for blocking for a while if fails fail_cache.put(self._request.ip, fails + 1, GlobalConfig.LOGIN_BLOCK.as_int()) - return Login.result(error='Invalid credentials') + return Login.result(error=auth_result.errstr or 'Invalid credentials') return Login.result( result='ok', token=self.gen_auth_token( @@ -238,7 +238,7 @@ class Auths(Handler): 'auth': auth.name, 'type': theType.type_type, 'priority': auth.priority, - 'isCustom': theType.is_custom(), # Deprecated, use 'custom' + 'isCustom': theType.is_custom(), # Deprecated, use 'custom' 'custom': theType.is_custom(), } diff --git a/server/src/uds/core/auths/auth.py b/server/src/uds/core/auths/auth.py index 9c5e8214e..e7f4f6e8a 100644 --- a/server/src/uds/core/auths/auth.py +++ b/server/src/uds/core/auths/auth.py @@ -47,12 +47,9 @@ from django.utils.translation import gettext as _ from uds import models from uds.core import auths, consts, exceptions, types from uds.core.auths import Authenticator as AuthenticatorInstance -from uds.core.managers.crypto import CryptoManager -from uds.core.types.requests import ExtendedHttpRequest -from uds.core.types.states import State from uds.core.util import config, log, net -from uds.core.util.config import GlobalConfig from uds.core.util.stats import events +from uds.core.managers.crypto import CryptoManager # Not imported at runtime, just for type checking if typing.TYPE_CHECKING: @@ -67,11 +64,6 @@ RT = typing.TypeVar('RT') # Local type only -class AuthResult(typing.NamedTuple): - user: typing.Optional[models.User] = None - url: typing.Optional[str] = None - - def uds_cookie( request: HttpRequest, response: typing.Optional[HttpResponse] = None, @@ -87,7 +79,7 @@ def uds_cookie( 'uds', cookie, samesite='Lax', - httponly=GlobalConfig.ENHANCED_SECURITY.as_bool(), + httponly=config.GlobalConfig.ENHANCED_SECURITY.as_bool(), ) request.COOKIES['uds'] = cookie else: @@ -108,9 +100,9 @@ def root_user() -> models.User: """ user = models.User( id=consts.auth.ROOT_ID, - name=GlobalConfig.SUPER_USER_LOGIN.get(True), + name=config.GlobalConfig.SUPER_USER_LOGIN.get(True), real_name=_('System Administrator'), - state=State.ACTIVE, + state=types.states.State.ACTIVE, staff_member=True, is_admin=True, ) @@ -150,7 +142,7 @@ def web_login_required( ) -> collections.abc.Callable[..., HttpResponse]: @wraps(view_func) def _wrapped_view( - request: 'ExtendedHttpRequest', *args: typing.Any, **kwargs: typing.Any + request: 'types.requests.ExtendedHttpRequest', *args: typing.Any, **kwargs: typing.Any ) -> HttpResponse: """ Wrapped function for decorator @@ -172,11 +164,11 @@ def web_login_required( # Helper for checking if requests is from trusted source def is_trusted_source(ip: str) -> bool: - return net.contains(GlobalConfig.TRUSTED_SOURCES.get(True), ip) + return net.contains(config.GlobalConfig.TRUSTED_SOURCES.get(True), ip) def is_trusted_ip_forwarder(ip: str) -> bool: - return net.contains(GlobalConfig.ALLOWED_IP_FORWARDERS.get(True), ip) + return net.contains(config.GlobalConfig.ALLOWED_IP_FORWARDERS.get(True), ip) # Decorator to protect pages that needs to be accessed from "trusted sites" @@ -188,7 +180,7 @@ def needs_trusted_source( """ @wraps(view_func) - def _wrapped_view(request: 'ExtendedHttpRequest', *args: typing.Any, **kwargs: typing.Any) -> HttpResponse: + def _wrapped_view(request: 'types.requests.ExtendedHttpRequest', *args: typing.Any, **kwargs: typing.Any) -> HttpResponse: """ Wrapped function for decorator """ @@ -198,7 +190,7 @@ def needs_trusted_source( except Exception: logger.warning( 'Error checking trusted source: "%s" does not seems to be a valid network string. Using Unrestricted access.', - GlobalConfig.TRUSTED_SOURCES.get(), + config.GlobalConfig.TRUSTED_SOURCES.get(), ) return view_func(request, *args, **kwargs) @@ -210,7 +202,7 @@ def needs_trusted_source( # it's designed to be used in ajax calls mainly def deny_non_authenticated(view_func: collections.abc.Callable[..., RT]) -> collections.abc.Callable[..., RT]: @wraps(view_func) - def _wrapped_view(request: 'ExtendedHttpRequest', *args: typing.Any, **kwargs: typing.Any) -> RT: + def _wrapped_view(request: 'types.requests.ExtendedHttpRequest', *args: typing.Any, **kwargs: typing.Any) -> RT: if not request.user or not request.authorized: return HttpResponseForbidden() # type: ignore return view_func(request, *args, **kwargs) @@ -222,8 +214,8 @@ def register_user( authenticator: models.Authenticator, auth_instance: AuthenticatorInstance, username: str, - request: 'ExtendedHttpRequest', -) -> AuthResult: + request: 'types.requests.ExtendedHttpRequest', +) -> types.auth.LoginResult: """ Check if this user already exists on database with this authenticator, if don't, create it with defaults This will work correctly with both internal or externals cause we first authenticate the user, if internal and user do not exists in database @@ -235,7 +227,7 @@ def register_user( usr = authenticator.get_or_create_user(username, username) usr.real_name = auth_instance.get_real_name(username) usr.save() - if usr and State.from_str(usr.state).is_active(): + if usr and types.states.State.from_str(usr.state).is_active(): # Now we update database groups for this user usr.get_manager().recreate_groups(usr) # And add an login event @@ -247,17 +239,17 @@ def register_user( browser=request.os.browser, version=request.os.version, ) - return AuthResult(user=usr) + return types.auth.LoginResult(user=usr) - return AuthResult() + return types.auth.LoginResult() def authenticate( username: str, password: str, authenticator: models.Authenticator, - request: 'ExtendedHttpRequest', -) -> AuthResult: + request: 'types.requests.ExtendedHttpRequest', +) -> types.auth.LoginResult: """ Given an username, password and authenticator, try to authenticate user @param username: username to authenticate @@ -266,7 +258,7 @@ def authenticate( @param request: Request object @return: - An AuthResult indicating: + An types.auth.LoginResult indicating: user if success in logging in field user or None if not url if not success in logging in field url so instead of error UDS will redirect to this url @@ -275,29 +267,37 @@ def authenticate( logger.debug('Authenticating user %s with authenticator %s', username, authenticator) # If global root auth is enabled && user/password is correct, + # Note: From now onwards, root "we user" can only login from a trusted source if ( - GlobalConfig.SUPER_USER_ALLOW_WEBACCESS.as_bool(True) - and username == GlobalConfig.SUPER_USER_LOGIN.get(True) - and CryptoManager().check_hash(password, GlobalConfig.SUPER_USER_PASS.get(True)) + config.GlobalConfig.SUPER_USER_ALLOW_WEBACCESS.as_bool(True) + and is_trusted_source(request.ip) + and username == config.GlobalConfig.SUPER_USER_LOGIN.get(True) + and CryptoManager.manager().check_hash(password, config.GlobalConfig.SUPER_USER_PASS.get(True)) ): - return AuthResult(user=root_user()) + return types.auth.LoginResult(user=root_user()) gm = auths.GroupsManager(authenticator) auth_instance = authenticator.get_instance() - if auth_instance.is_ip_allowed(request) is False: - logger.info('Access tried from an unallowed source') - return AuthResult() + if auth_instance.is_ip_allowed(request=request) is False: + log_login( + request, + authenticator, + username, + 'Access tried from an unallowed source', + as_error=True, + ) + return types.auth.LoginResult(errstr=_('Access tried from an unallowed source')) res = auth_instance.authenticate(username, password, gm, request) if res.success == types.auth.AuthenticationState.FAIL: logger.debug('Authentication failed') # Maybe it's an redirection on auth failed? - return AuthResult() + return types.auth.LoginResult() if res.success == types.auth.AuthenticationState.REDIRECT: - return AuthResult(url=res.url) + return types.auth.LoginResult(url=res.url) logger.debug('Groups manager: %s', gm) @@ -307,7 +307,7 @@ def authenticate( 'User %s has been authenticated, but he does not belongs to any UDS known group', username, ) - return AuthResult() + return types.auth.LoginResult() return register_user(authenticator, auth_instance, username, request) @@ -316,7 +316,7 @@ def authenticate_via_callback( authenticator: models.Authenticator, params: 'types.auth.AuthCallbackParams', request: 'ExtendedHttpRequestWithUser', -) -> AuthResult: +) -> types.auth.LoginResult: """ Given an username, this method will get invoked whenever the url for a callback for an authenticator is requested. @@ -349,7 +349,7 @@ def authenticate_via_callback( raise exceptions.auth.InvalidUserException('User doesn\'t has access to UDS') if result.success == types.auth.AuthenticationState.REDIRECT: - return AuthResult(url=result.url) + return types.auth.LoginResult(url=result.url) if result.username: return register_user(authenticator, auth_instance, result.username or '', request) @@ -381,7 +381,7 @@ def authenticate_info_url(authenticator: typing.Union[str, bytes, models.Authent def web_login( - request: 'ExtendedHttpRequest', + request: 'types.requests.ExtendedHttpRequest', response: typing.Optional[HttpResponse], user: models.User, password: str, @@ -405,7 +405,7 @@ def web_login( # Store request ip in session request.session[consts.auth.SESSION_IP_KEY] = request.ip # If Enabled zero trust, do not cache credentials - if GlobalConfig.ENFORCE_ZERO_TRUST.as_bool(False): + if config.GlobalConfig.ENFORCE_ZERO_TRUST.as_bool(False): password = '' # nosec: clear password if zero trust is enabled request.session[consts.auth.SESSION_USER_KEY] = user.id @@ -444,14 +444,14 @@ def web_password(request: HttpRequest) -> str: return CryptoManager().symmetric_decrypt(passkey, uds_cookie(request)) # recover as original unicode string -def web_logout(request: 'ExtendedHttpRequest', exit_url: typing.Optional[str] = None) -> HttpResponse: +def web_logout(request: 'types.requests.ExtendedHttpRequest', exit_url: typing.Optional[str] = None) -> HttpResponse: """ Helper function to clear user related data from session. If this method is not used, the session we be cleaned anyway by django in regular basis. """ tag = request.session.pop('tag', None) if tag and config.GlobalConfig.REDIRECT_TO_TAG_ON_LOGOUT.as_bool(False): - exit_page = reverse(types.auth.AuthenticationInternalUrl.LOGIN_TAG, kwargs={'tag': tag}) + exit_page = reverse(types.auth.AuthenticationInternalUrl.LOGIN_LABEL, kwargs={'tag': tag}) else: # remove, if exists, tag from session exit_page = reverse(types.auth.AuthenticationInternalUrl.LOGIN) @@ -485,7 +485,7 @@ def web_logout(request: 'ExtendedHttpRequest', exit_url: typing.Optional[str] = def log_login( - request: 'ExtendedHttpRequest', + request: 'types.requests.ExtendedHttpRequest', authenticator: models.Authenticator, userName: str, log_string: str = '', @@ -531,7 +531,7 @@ def log_login( logger.info('Root %s from %s where OS is %s', log_string, request.ip, request.os.os.name) -def log_logout(request: 'ExtendedHttpRequest') -> None: +def log_logout(request: 'types.requests.ExtendedHttpRequest') -> None: if request.user: if request.user.manager.id: log.log( diff --git a/server/src/uds/core/types/auth.py b/server/src/uds/core/types/auth.py index ae5ffce63..9bfa2c3f1 100644 --- a/server/src/uds/core/types/auth.py +++ b/server/src/uds/core/types/auth.py @@ -58,7 +58,7 @@ class AuthenticationInternalUrl(enum.StrEnum): """ LOGIN = 'page.login' - LOGIN_TAG = 'page.login.tag' + LOGIN_LABEL = 'page.login.tag' LOGOUT = 'page.logout' def get_url(self) -> str: diff --git a/server/src/uds/urls.py b/server/src/uds/urls.py index 1265756cb..aa70b68c7 100644 --- a/server/src/uds/urls.py +++ b/server/src/uds/urls.py @@ -101,7 +101,7 @@ urlpatterns = [ re_path( r'^uds/page/login/(?P[a-zA-Z0-9-]+)$', uds.web.views.main.login, - name=types.auth.AuthenticationInternalUrl.LOGIN_TAG.value, + name=types.auth.AuthenticationInternalUrl.LOGIN_LABEL.value, ), path( r'uds/page/logout', diff --git a/server/src/uds/web/util/authentication.py b/server/src/uds/web/util/authentication.py index cffbe75f6..7da762ae0 100644 --- a/server/src/uds/web/util/authentication.py +++ b/server/src/uds/web/util/authentication.py @@ -80,16 +80,6 @@ def check_login( # pylint: disable=too-many-branches, too-many-statements log_login(request, authenticator, username, 'Temporarily blocked', as_error=True) return types.auth.LoginResult(errstr=_('Too many authentication errrors. User temporarily blocked')) # check if authenticator is visible for this requests - if auth_instance.is_ip_allowed(request=request) is False: - log_login( - request, - authenticator, - username, - 'Access tried from an unallowed source', - as_error=True, - ) - return types.auth.LoginResult(errstr=_('Access tried from an unallowed source')) - password = ( form.cleaned_data['password'] or 'axd56adhg466jasd6q8sadñ€sáé--v' ) # Random string, in fact, just a placeholder that will not be used :)