mirror of
https://github.com/dkmstr/openuds.git
synced 2025-01-03 01:17:56 +03:00
Refactor authentication method names for consistency: replaced 'web_login' and 'web_logout' with 'weblogin' and 'weblogout' across multiple files
Fixed security to, in case of ip change on secured sessions, redirect to login instead of showing a "forbidden"
This commit is contained in:
parent
14278daa14
commit
6ba08a361a
@ -38,7 +38,7 @@ from django.template import RequestContext, loader
|
||||
from django.utils.translation import gettext as _
|
||||
|
||||
from uds.core import consts
|
||||
from uds.core.auths.auth import web_login_required
|
||||
from uds.core.auths.auth import weblogin_required
|
||||
|
||||
logger = logging.getLogger(__name__)
|
||||
|
||||
@ -46,7 +46,7 @@ if typing.TYPE_CHECKING:
|
||||
from django.http import HttpRequest
|
||||
|
||||
|
||||
@web_login_required(admin=True)
|
||||
@weblogin_required(admin=True)
|
||||
def index(request: 'HttpRequest') -> HttpResponse:
|
||||
# Gets csrf token
|
||||
csrf_token = csrf.get_token(request)
|
||||
@ -58,7 +58,7 @@ def index(request: 'HttpRequest') -> HttpResponse:
|
||||
)
|
||||
|
||||
|
||||
@web_login_required(admin=True)
|
||||
@weblogin_required(admin=True)
|
||||
def tmpl(request: 'HttpRequest', template: str) -> HttpResponse:
|
||||
try:
|
||||
t = loader.get_template('uds/admin/tmpl/' + template + ".html")
|
||||
@ -70,6 +70,6 @@ def tmpl(request: 'HttpRequest', template: str) -> HttpResponse:
|
||||
return HttpResponse(resp, content_type="text/plain")
|
||||
|
||||
|
||||
@web_login_required(admin=True)
|
||||
@weblogin_required(admin=True)
|
||||
def sample(request: 'HttpRequest') -> HttpResponse:
|
||||
return render(request, 'uds/admin/sample.html')
|
||||
|
@ -116,7 +116,7 @@ def root_user() -> models.User:
|
||||
|
||||
|
||||
# Decorator to make easier protect pages that needs to be logged in
|
||||
def web_login_required(
|
||||
def weblogin_required(
|
||||
admin: typing.Union[bool, typing.Literal['admin']] = False
|
||||
) -> collections.abc.Callable[
|
||||
[collections.abc.Callable[..., HttpResponse]], collections.abc.Callable[..., HttpResponse]
|
||||
@ -149,7 +149,7 @@ def web_login_required(
|
||||
"""
|
||||
# If no user or user authorization is not completed...
|
||||
if not request.user or not request.authorized:
|
||||
return HttpResponseRedirect(reverse('page.login'))
|
||||
return weblogout(request)
|
||||
|
||||
if admin in (True, 'admin'):
|
||||
if request.user.is_staff() is False or (admin == 'admin' and not request.user.is_admin):
|
||||
@ -200,7 +200,7 @@ def needs_trusted_source(
|
||||
|
||||
|
||||
# decorator to deny non authenticated requests
|
||||
# The difference with web_login_required is that this one does not redirect to login page
|
||||
# The difference with weblogin_required is that this one does not redirect to login page
|
||||
# it's designed to be used in ajax calls mainly
|
||||
def deny_non_authenticated(view_func: collections.abc.Callable[..., RT]) -> collections.abc.Callable[..., RT]:
|
||||
@wraps(view_func)
|
||||
@ -385,7 +385,7 @@ def authenticate_info_url(authenticator: typing.Union[str, bytes, models.Authent
|
||||
return reverse('page.auth.info', kwargs={'authenticator_name': name})
|
||||
|
||||
|
||||
def web_login(
|
||||
def weblogin(
|
||||
request: 'types.requests.ExtendedHttpRequest',
|
||||
response: typing.Optional[HttpResponse],
|
||||
user: models.User,
|
||||
@ -434,11 +434,11 @@ def web_login(
|
||||
return True
|
||||
|
||||
|
||||
def web_password(request: HttpRequest) -> str:
|
||||
def get_webpassword(request: HttpRequest) -> str:
|
||||
"""
|
||||
The password is stored at session using a simple scramble algorithm that keeps the password splited at
|
||||
session (db) and client browser cookies. This method uses this two values to recompose the user password
|
||||
so we can provide it to remote sessions.
|
||||
so we can provide it to remote sessions. (this way, the password is never completely stored at any side)
|
||||
"""
|
||||
if hasattr(request, '_cryptedpass') and hasattr(request, '_scrambler'):
|
||||
return CryptoManager.manager().symmetric_decrypt(
|
||||
@ -449,8 +449,9 @@ def web_password(request: HttpRequest) -> str:
|
||||
return CryptoManager().symmetric_decrypt(passkey, uds_cookie(request)) # recover as original unicode string
|
||||
|
||||
|
||||
def web_logout(
|
||||
request: 'types.requests.ExtendedHttpRequest', exit_url: typing.Optional[str] = None
|
||||
def weblogout(
|
||||
request: 'types.requests.ExtendedHttpRequest',
|
||||
exit_url: typing.Optional[str] = None,
|
||||
) -> HttpResponse:
|
||||
"""
|
||||
Helper function to clear user related data from session. If this method is not used, the session we be cleaned anyway
|
||||
@ -462,7 +463,8 @@ def web_logout(
|
||||
else:
|
||||
# remove, if exists, tag from session
|
||||
exit_page = reverse(types.auth.AuthenticationInternalUrl.LOGIN)
|
||||
exit_url = exit_url or exit_page
|
||||
|
||||
response = HttpResponseRedirect(exit_url or exit_page)
|
||||
try:
|
||||
if request.user:
|
||||
authenticator = request.user.manager.get_instance()
|
||||
@ -478,16 +480,12 @@ def web_logout(
|
||||
username=request.user.name,
|
||||
srcip=request.ip,
|
||||
)
|
||||
else: # No user, redirect to /
|
||||
return HttpResponseRedirect(exit_page)
|
||||
authenticator.hook_web_logout(username, request, response)
|
||||
finally:
|
||||
# Try to delete session
|
||||
request.session.flush()
|
||||
request.authorized = False
|
||||
|
||||
response = HttpResponseRedirect(exit_url)
|
||||
if authenticator:
|
||||
authenticator.hook_web_logout(username, request, response)
|
||||
return response
|
||||
|
||||
|
||||
|
@ -46,7 +46,7 @@ from uds.auths.Radius.client import (
|
||||
NOT_NEEDED,
|
||||
# NEEDED
|
||||
)
|
||||
from uds.core.auths.auth import web_password
|
||||
from uds.core.auths.auth import get_webpassword
|
||||
from uds.core.util import fields
|
||||
|
||||
if typing.TYPE_CHECKING:
|
||||
@ -180,7 +180,7 @@ class RadiusOTP(mfas.MFA):
|
||||
if self.send_just_username.value:
|
||||
username = username.strip().split('@')[0].split('\\')[-1]
|
||||
|
||||
web_pwd = web_password(request)
|
||||
web_pwd = get_webpassword(request)
|
||||
try:
|
||||
connection = self.radius_client()
|
||||
auth_reply = connection.authenticate_challenge(username, password=web_pwd)
|
||||
@ -250,7 +250,7 @@ class RadiusOTP(mfas.MFA):
|
||||
try:
|
||||
err = _('Invalid OTP code')
|
||||
|
||||
web_pwd = web_password(request)
|
||||
web_pwd = get_webpassword(request)
|
||||
try:
|
||||
connection = self.radius_client()
|
||||
state = request.session.get(client.STATE_VAR_NAME, b'')
|
||||
|
@ -41,7 +41,7 @@ from uds.core import consts, types
|
||||
from uds.core.auths.auth import (
|
||||
is_trusted_ip_forwarder,
|
||||
root_user,
|
||||
web_logout,
|
||||
weblogout,
|
||||
)
|
||||
from uds.models import User
|
||||
|
||||
@ -151,7 +151,7 @@ def _process_request(request: 'ExtendedHttpRequest') -> typing.Optional['HttpRes
|
||||
expiry = now
|
||||
if expiry < now:
|
||||
try:
|
||||
return web_logout(request=request)
|
||||
return weblogout(request=request)
|
||||
except Exception: # nosec: intentionaly catching all exceptions and ignoring them
|
||||
pass # If fails, we don't care, we just want to logout
|
||||
return HttpResponseForbidden(content='Session Expired', content_type='text/plain')
|
||||
|
@ -38,7 +38,7 @@ from django.http import HttpResponseForbidden
|
||||
|
||||
from uds.core import consts
|
||||
from uds.core.util.config import GlobalConfig
|
||||
from uds.core.auths.auth import is_trusted_source
|
||||
from uds.core.auths.auth import is_trusted_source, weblogout
|
||||
|
||||
from . import builder
|
||||
|
||||
@ -79,7 +79,10 @@ def _process_request(request: 'ExtendedHttpRequest') -> typing.Optional['HttpRes
|
||||
request.user,
|
||||
request.session.get('ip', None),
|
||||
)
|
||||
return HttpResponseForbidden(content='Forbbiden', content_type='text/plain')
|
||||
|
||||
# Clear session and redirect to login, skipping manager
|
||||
weblogout(request)
|
||||
|
||||
|
||||
return None
|
||||
|
||||
|
@ -38,7 +38,7 @@ from django.utils import formats
|
||||
from django.utils.translation import gettext
|
||||
|
||||
from uds.core import types
|
||||
from uds.core.auths.auth import web_password
|
||||
from uds.core.auths.auth import get_webpassword
|
||||
from uds.core.managers.crypto import CryptoManager
|
||||
from uds.core.managers.userservice import UserServiceManager
|
||||
from uds.core.exceptions.services import (
|
||||
@ -432,7 +432,7 @@ def enable_service(
|
||||
request.user, request.os, request.ip, service_id, transport_id, test_userservice_status=False
|
||||
)
|
||||
scrambler = CryptoManager().random_string(32)
|
||||
password = CryptoManager().symmetric_encrypt(web_password(request), scrambler)
|
||||
password = CryptoManager().symmetric_encrypt(get_webpassword(request), scrambler)
|
||||
|
||||
info.userservice.properties['accessed_by_client'] = False # Reset accesed property to
|
||||
|
||||
|
@ -42,7 +42,7 @@ from django.views.decorators.csrf import csrf_exempt
|
||||
|
||||
from uds.core import auths, exceptions, types
|
||||
from uds.core.auths import auth
|
||||
from uds.core.auths.auth import authenticate_via_callback, log_login, uds_cookie, web_login, web_logout
|
||||
from uds.core.auths.auth import authenticate_via_callback, log_login, uds_cookie, weblogin, weblogout
|
||||
from uds.core.managers.crypto import CryptoManager
|
||||
from uds.core.managers.userservice import UserServiceManager
|
||||
from uds.core.exceptions.services import ServiceNotReadyError
|
||||
@ -118,7 +118,7 @@ def auth_callback_stage2(request: 'ExtendedHttpRequestWithUser', ticket_id: str)
|
||||
|
||||
response = HttpResponseRedirect(reverse('page.index'))
|
||||
|
||||
web_login(request, response, result.user, '') # Password is unavailable in this case
|
||||
weblogin(request, response, result.user, '') # Password is unavailable in this case
|
||||
|
||||
log_login(request, authenticator, result.user.name, 'Federated login') # Nice login, just indicating it's federated
|
||||
|
||||
@ -134,7 +134,7 @@ def auth_callback_stage2(request: 'ExtendedHttpRequestWithUser', ticket_id: str)
|
||||
except exceptions.auth.Redirect as e:
|
||||
return HttpResponseRedirect(request.build_absolute_uri(str(e)) if e.args and e.args[0] else '/')
|
||||
except exceptions.auth.Logout as e:
|
||||
return web_logout(
|
||||
return weblogout(
|
||||
request,
|
||||
request.build_absolute_uri(str(e)) if e.args and e.args[0] else None,
|
||||
)
|
||||
@ -240,7 +240,7 @@ def ticket_auth(
|
||||
usr.groups.set(grps)
|
||||
|
||||
# Force cookie generation
|
||||
web_login(request, None, usr, password)
|
||||
weblogin(request, None, usr, password)
|
||||
|
||||
# Log the login
|
||||
log_login(request, auth, username, 'Ticket authentication') # Nice login, just indicating it's using a ticket
|
||||
@ -314,7 +314,7 @@ def login(request: types.requests.ExtendedHttpRequest, tag: typing.Optional[str]
|
||||
# Tag is not removed from session, so next login will have it even if not provided
|
||||
# This means than once an url is used, unless manually goes to "/uds/page/login/xxx"
|
||||
# The tag will be used again
|
||||
auth.web_login(
|
||||
auth.weblogin(
|
||||
request, response, login_result.user, login_result.password
|
||||
) # data is user password here
|
||||
|
||||
@ -350,7 +350,7 @@ def login(request: types.requests.ExtendedHttpRequest, tag: typing.Optional[str]
|
||||
|
||||
|
||||
@never_cache
|
||||
@auth.web_login_required(admin=False)
|
||||
@auth.weblogin_required(admin=False)
|
||||
def logout(request: types.requests.ExtendedHttpRequestWithUser) -> HttpResponse:
|
||||
auth.log_logout(request)
|
||||
request.session['restricted'] = False # Remove restricted
|
||||
@ -358,4 +358,4 @@ def logout(request: types.requests.ExtendedHttpRequestWithUser) -> HttpResponse:
|
||||
logout_response = request.user.logout(request)
|
||||
url = logout_response.url if logout_response.success == types.auth.AuthenticationState.REDIRECT else None
|
||||
|
||||
return auth.web_logout(request, url or request.session.get('logouturl', None))
|
||||
return auth.weblogout(request, url or request.session.get('logouturl', None))
|
||||
|
@ -31,7 +31,7 @@ Author: Adolfo Gómez, dkmaster at dkmon dot com
|
||||
import logging
|
||||
import typing
|
||||
|
||||
from uds.core.auths.auth import web_login_required
|
||||
from uds.core.auths.auth import weblogin_required
|
||||
from uds.core.managers import downloads_manager
|
||||
from .main import index
|
||||
|
||||
@ -45,7 +45,7 @@ if typing.TYPE_CHECKING:
|
||||
logger = logging.getLogger(__name__)
|
||||
|
||||
|
||||
@web_login_required(admin=True)
|
||||
@weblogin_required(admin=True)
|
||||
def download(request: 'HttpRequest', download_id: str) -> 'HttpResponse':
|
||||
"""
|
||||
Downloadables management
|
||||
|
@ -41,7 +41,7 @@ from django.views.decorators.csrf import csrf_exempt
|
||||
from uds import models
|
||||
from uds.core import types
|
||||
from uds.core.auths import auth
|
||||
from uds.core.auths.auth import web_login_required, web_password
|
||||
from uds.core.auths.auth import weblogin_required, get_webpassword
|
||||
from uds.core.managers.crypto import CryptoManager
|
||||
from uds.core.managers.userservice import UserServiceManager
|
||||
from uds.core.types.requests import ExtendedHttpRequest
|
||||
@ -60,7 +60,7 @@ if typing.TYPE_CHECKING:
|
||||
logger = logging.getLogger(__name__)
|
||||
|
||||
|
||||
@web_login_required(admin=False)
|
||||
@weblogin_required(admin=False)
|
||||
def transport_own_link(
|
||||
request: 'ExtendedHttpRequestWithUser', service_id: str, transport_id: str
|
||||
) -> HttpResponse:
|
||||
@ -83,7 +83,7 @@ def transport_own_link(
|
||||
info.ip,
|
||||
request.os,
|
||||
request.user,
|
||||
web_password(request),
|
||||
get_webpassword(request),
|
||||
request,
|
||||
),
|
||||
)
|
||||
@ -106,7 +106,7 @@ def transport_own_link(
|
||||
|
||||
|
||||
# pylint: disable=unused-argument
|
||||
@web_login_required(admin=False)
|
||||
@weblogin_required(admin=False)
|
||||
@never_cache
|
||||
def user_service_enabler(
|
||||
request: 'ExtendedHttpRequestWithUser', service_id: str, transport_id: str
|
||||
@ -126,7 +126,7 @@ def closer(request: 'ExtendedHttpRequest') -> HttpResponse:
|
||||
# return HttpResponse('<html><body onload="window.close()"></body></html>')
|
||||
|
||||
|
||||
@web_login_required(admin=False)
|
||||
@weblogin_required(admin=False)
|
||||
@never_cache
|
||||
def user_service_status(
|
||||
request: 'ExtendedHttpRequestWithUser', service_id: str, transport_id: str
|
||||
@ -170,7 +170,7 @@ def user_service_status(
|
||||
return HttpResponse(json.dumps({'status': status}), content_type='application/json')
|
||||
|
||||
|
||||
@web_login_required(admin=False)
|
||||
@weblogin_required(admin=False)
|
||||
@never_cache
|
||||
def action(request: 'ExtendedHttpRequestWithUser', service_id: str, action_string: str) -> HttpResponse:
|
||||
userservice = UserServiceManager.manager().locate_meta_service(request.user, service_id)
|
||||
|
Loading…
Reference in New Issue
Block a user