1
0
mirror of https://github.com/dkmstr/openuds.git synced 2024-12-22 13:34:04 +03:00

Some improvements to Authentication limitation by network (Extended to REST api and whatever uses authenticator)

This commit is contained in:
Adolfo Gómez García 2024-09-19 19:32:42 +02:00
parent fc671f2a7a
commit 6ce5cbe10e
No known key found for this signature in database
GPG Key ID: DD1ABF20724CDA23
5 changed files with 48 additions and 58 deletions

View File

@ -179,7 +179,7 @@ class Login(Handler):
# And store in cache for blocking for a while if fails
fail_cache.put(self._request.ip, fails + 1, GlobalConfig.LOGIN_BLOCK.as_int())
return Login.result(error='Invalid credentials')
return Login.result(error=auth_result.errstr or 'Invalid credentials')
return Login.result(
result='ok',
token=self.gen_auth_token(
@ -238,7 +238,7 @@ class Auths(Handler):
'auth': auth.name,
'type': theType.type_type,
'priority': auth.priority,
'isCustom': theType.is_custom(), # Deprecated, use 'custom'
'isCustom': theType.is_custom(), # Deprecated, use 'custom'
'custom': theType.is_custom(),
}

View File

@ -47,12 +47,9 @@ from django.utils.translation import gettext as _
from uds import models
from uds.core import auths, consts, exceptions, types
from uds.core.auths import Authenticator as AuthenticatorInstance
from uds.core.managers.crypto import CryptoManager
from uds.core.types.requests import ExtendedHttpRequest
from uds.core.types.states import State
from uds.core.util import config, log, net
from uds.core.util.config import GlobalConfig
from uds.core.util.stats import events
from uds.core.managers.crypto import CryptoManager
# Not imported at runtime, just for type checking
if typing.TYPE_CHECKING:
@ -67,11 +64,6 @@ RT = typing.TypeVar('RT')
# Local type only
class AuthResult(typing.NamedTuple):
user: typing.Optional[models.User] = None
url: typing.Optional[str] = None
def uds_cookie(
request: HttpRequest,
response: typing.Optional[HttpResponse] = None,
@ -87,7 +79,7 @@ def uds_cookie(
'uds',
cookie,
samesite='Lax',
httponly=GlobalConfig.ENHANCED_SECURITY.as_bool(),
httponly=config.GlobalConfig.ENHANCED_SECURITY.as_bool(),
)
request.COOKIES['uds'] = cookie
else:
@ -108,9 +100,9 @@ def root_user() -> models.User:
"""
user = models.User(
id=consts.auth.ROOT_ID,
name=GlobalConfig.SUPER_USER_LOGIN.get(True),
name=config.GlobalConfig.SUPER_USER_LOGIN.get(True),
real_name=_('System Administrator'),
state=State.ACTIVE,
state=types.states.State.ACTIVE,
staff_member=True,
is_admin=True,
)
@ -150,7 +142,7 @@ def web_login_required(
) -> collections.abc.Callable[..., HttpResponse]:
@wraps(view_func)
def _wrapped_view(
request: 'ExtendedHttpRequest', *args: typing.Any, **kwargs: typing.Any
request: 'types.requests.ExtendedHttpRequest', *args: typing.Any, **kwargs: typing.Any
) -> HttpResponse:
"""
Wrapped function for decorator
@ -172,11 +164,11 @@ def web_login_required(
# Helper for checking if requests is from trusted source
def is_trusted_source(ip: str) -> bool:
return net.contains(GlobalConfig.TRUSTED_SOURCES.get(True), ip)
return net.contains(config.GlobalConfig.TRUSTED_SOURCES.get(True), ip)
def is_trusted_ip_forwarder(ip: str) -> bool:
return net.contains(GlobalConfig.ALLOWED_IP_FORWARDERS.get(True), ip)
return net.contains(config.GlobalConfig.ALLOWED_IP_FORWARDERS.get(True), ip)
# Decorator to protect pages that needs to be accessed from "trusted sites"
@ -188,7 +180,7 @@ def needs_trusted_source(
"""
@wraps(view_func)
def _wrapped_view(request: 'ExtendedHttpRequest', *args: typing.Any, **kwargs: typing.Any) -> HttpResponse:
def _wrapped_view(request: 'types.requests.ExtendedHttpRequest', *args: typing.Any, **kwargs: typing.Any) -> HttpResponse:
"""
Wrapped function for decorator
"""
@ -198,7 +190,7 @@ def needs_trusted_source(
except Exception:
logger.warning(
'Error checking trusted source: "%s" does not seems to be a valid network string. Using Unrestricted access.',
GlobalConfig.TRUSTED_SOURCES.get(),
config.GlobalConfig.TRUSTED_SOURCES.get(),
)
return view_func(request, *args, **kwargs)
@ -210,7 +202,7 @@ def needs_trusted_source(
# it's designed to be used in ajax calls mainly
def deny_non_authenticated(view_func: collections.abc.Callable[..., RT]) -> collections.abc.Callable[..., RT]:
@wraps(view_func)
def _wrapped_view(request: 'ExtendedHttpRequest', *args: typing.Any, **kwargs: typing.Any) -> RT:
def _wrapped_view(request: 'types.requests.ExtendedHttpRequest', *args: typing.Any, **kwargs: typing.Any) -> RT:
if not request.user or not request.authorized:
return HttpResponseForbidden() # type: ignore
return view_func(request, *args, **kwargs)
@ -222,8 +214,8 @@ def register_user(
authenticator: models.Authenticator,
auth_instance: AuthenticatorInstance,
username: str,
request: 'ExtendedHttpRequest',
) -> AuthResult:
request: 'types.requests.ExtendedHttpRequest',
) -> types.auth.LoginResult:
"""
Check if this user already exists on database with this authenticator, if don't, create it with defaults
This will work correctly with both internal or externals cause we first authenticate the user, if internal and user do not exists in database
@ -235,7 +227,7 @@ def register_user(
usr = authenticator.get_or_create_user(username, username)
usr.real_name = auth_instance.get_real_name(username)
usr.save()
if usr and State.from_str(usr.state).is_active():
if usr and types.states.State.from_str(usr.state).is_active():
# Now we update database groups for this user
usr.get_manager().recreate_groups(usr)
# And add an login event
@ -247,17 +239,17 @@ def register_user(
browser=request.os.browser,
version=request.os.version,
)
return AuthResult(user=usr)
return types.auth.LoginResult(user=usr)
return AuthResult()
return types.auth.LoginResult()
def authenticate(
username: str,
password: str,
authenticator: models.Authenticator,
request: 'ExtendedHttpRequest',
) -> AuthResult:
request: 'types.requests.ExtendedHttpRequest',
) -> types.auth.LoginResult:
"""
Given an username, password and authenticator, try to authenticate user
@param username: username to authenticate
@ -266,7 +258,7 @@ def authenticate(
@param request: Request object
@return:
An AuthResult indicating:
An types.auth.LoginResult indicating:
user if success in logging in field user or None if not
url if not success in logging in field url so instead of error UDS will redirect to this url
@ -275,29 +267,37 @@ def authenticate(
logger.debug('Authenticating user %s with authenticator %s', username, authenticator)
# If global root auth is enabled && user/password is correct,
# Note: From now onwards, root "we user" can only login from a trusted source
if (
GlobalConfig.SUPER_USER_ALLOW_WEBACCESS.as_bool(True)
and username == GlobalConfig.SUPER_USER_LOGIN.get(True)
and CryptoManager().check_hash(password, GlobalConfig.SUPER_USER_PASS.get(True))
config.GlobalConfig.SUPER_USER_ALLOW_WEBACCESS.as_bool(True)
and is_trusted_source(request.ip)
and username == config.GlobalConfig.SUPER_USER_LOGIN.get(True)
and CryptoManager.manager().check_hash(password, config.GlobalConfig.SUPER_USER_PASS.get(True))
):
return AuthResult(user=root_user())
return types.auth.LoginResult(user=root_user())
gm = auths.GroupsManager(authenticator)
auth_instance = authenticator.get_instance()
if auth_instance.is_ip_allowed(request) is False:
logger.info('Access tried from an unallowed source')
return AuthResult()
if auth_instance.is_ip_allowed(request=request) is False:
log_login(
request,
authenticator,
username,
'Access tried from an unallowed source',
as_error=True,
)
return types.auth.LoginResult(errstr=_('Access tried from an unallowed source'))
res = auth_instance.authenticate(username, password, gm, request)
if res.success == types.auth.AuthenticationState.FAIL:
logger.debug('Authentication failed')
# Maybe it's an redirection on auth failed?
return AuthResult()
return types.auth.LoginResult()
if res.success == types.auth.AuthenticationState.REDIRECT:
return AuthResult(url=res.url)
return types.auth.LoginResult(url=res.url)
logger.debug('Groups manager: %s', gm)
@ -307,7 +307,7 @@ def authenticate(
'User %s has been authenticated, but he does not belongs to any UDS known group',
username,
)
return AuthResult()
return types.auth.LoginResult()
return register_user(authenticator, auth_instance, username, request)
@ -316,7 +316,7 @@ def authenticate_via_callback(
authenticator: models.Authenticator,
params: 'types.auth.AuthCallbackParams',
request: 'ExtendedHttpRequestWithUser',
) -> AuthResult:
) -> types.auth.LoginResult:
"""
Given an username, this method will get invoked whenever the url for a callback
for an authenticator is requested.
@ -349,7 +349,7 @@ def authenticate_via_callback(
raise exceptions.auth.InvalidUserException('User doesn\'t has access to UDS')
if result.success == types.auth.AuthenticationState.REDIRECT:
return AuthResult(url=result.url)
return types.auth.LoginResult(url=result.url)
if result.username:
return register_user(authenticator, auth_instance, result.username or '', request)
@ -381,7 +381,7 @@ def authenticate_info_url(authenticator: typing.Union[str, bytes, models.Authent
def web_login(
request: 'ExtendedHttpRequest',
request: 'types.requests.ExtendedHttpRequest',
response: typing.Optional[HttpResponse],
user: models.User,
password: str,
@ -405,7 +405,7 @@ def web_login(
# Store request ip in session
request.session[consts.auth.SESSION_IP_KEY] = request.ip
# If Enabled zero trust, do not cache credentials
if GlobalConfig.ENFORCE_ZERO_TRUST.as_bool(False):
if config.GlobalConfig.ENFORCE_ZERO_TRUST.as_bool(False):
password = '' # nosec: clear password if zero trust is enabled
request.session[consts.auth.SESSION_USER_KEY] = user.id
@ -444,14 +444,14 @@ def web_password(request: HttpRequest) -> str:
return CryptoManager().symmetric_decrypt(passkey, uds_cookie(request)) # recover as original unicode string
def web_logout(request: 'ExtendedHttpRequest', exit_url: typing.Optional[str] = None) -> HttpResponse:
def web_logout(request: 'types.requests.ExtendedHttpRequest', exit_url: typing.Optional[str] = None) -> HttpResponse:
"""
Helper function to clear user related data from session. If this method is not used, the session we be cleaned anyway
by django in regular basis.
"""
tag = request.session.pop('tag', None)
if tag and config.GlobalConfig.REDIRECT_TO_TAG_ON_LOGOUT.as_bool(False):
exit_page = reverse(types.auth.AuthenticationInternalUrl.LOGIN_TAG, kwargs={'tag': tag})
exit_page = reverse(types.auth.AuthenticationInternalUrl.LOGIN_LABEL, kwargs={'tag': tag})
else:
# remove, if exists, tag from session
exit_page = reverse(types.auth.AuthenticationInternalUrl.LOGIN)
@ -485,7 +485,7 @@ def web_logout(request: 'ExtendedHttpRequest', exit_url: typing.Optional[str] =
def log_login(
request: 'ExtendedHttpRequest',
request: 'types.requests.ExtendedHttpRequest',
authenticator: models.Authenticator,
userName: str,
log_string: str = '',
@ -531,7 +531,7 @@ def log_login(
logger.info('Root %s from %s where OS is %s', log_string, request.ip, request.os.os.name)
def log_logout(request: 'ExtendedHttpRequest') -> None:
def log_logout(request: 'types.requests.ExtendedHttpRequest') -> None:
if request.user:
if request.user.manager.id:
log.log(

View File

@ -58,7 +58,7 @@ class AuthenticationInternalUrl(enum.StrEnum):
"""
LOGIN = 'page.login'
LOGIN_TAG = 'page.login.tag'
LOGIN_LABEL = 'page.login.tag'
LOGOUT = 'page.logout'
def get_url(self) -> str:

View File

@ -101,7 +101,7 @@ urlpatterns = [
re_path(
r'^uds/page/login/(?P<tag>[a-zA-Z0-9-]+)$',
uds.web.views.main.login,
name=types.auth.AuthenticationInternalUrl.LOGIN_TAG.value,
name=types.auth.AuthenticationInternalUrl.LOGIN_LABEL.value,
),
path(
r'uds/page/logout',

View File

@ -80,16 +80,6 @@ def check_login( # pylint: disable=too-many-branches, too-many-statements
log_login(request, authenticator, username, 'Temporarily blocked', as_error=True)
return types.auth.LoginResult(errstr=_('Too many authentication errrors. User temporarily blocked'))
# check if authenticator is visible for this requests
if auth_instance.is_ip_allowed(request=request) is False:
log_login(
request,
authenticator,
username,
'Access tried from an unallowed source',
as_error=True,
)
return types.auth.LoginResult(errstr=_('Access tried from an unallowed source'))
password = (
form.cleaned_data['password'] or 'axd56adhg466jasd6q8sadñ€sáé--v'
) # Random string, in fact, just a placeholder that will not be used :)