forked from shaba/openuds
fixed versions
This commit is contained in:
parent
1e74c55d2b
commit
e76695e583
@ -80,15 +80,15 @@ def getUDSCookie(request: HttpRequest, response: typing.Optional[HttpResponse] =
|
||||
return cookie
|
||||
|
||||
|
||||
def getRootUser():
|
||||
def getRootUser() -> User:
|
||||
# pylint: disable=unexpected-keyword-arg, no-value-for-parameter
|
||||
u = User(id=ROOT_ID, name=GlobalConfig.SUPER_USER_LOGIN.get(True), real_name=_(
|
||||
'System Administrator'), state=State.ACTIVE, staff_member=True, is_admin=True)
|
||||
u.manager = Authenticator()
|
||||
# Fake overwrite some methods, not too "legal" maybe? :)
|
||||
u.getGroups = lambda: []
|
||||
u.updateLastAccess = lambda: None
|
||||
u.logout = lambda: None
|
||||
# Fake overwrite some methods, a bit cheating? maybe? :)
|
||||
u.getGroups = lambda: [] # type: ignore
|
||||
u.updateLastAccess = lambda: None # type: ignore
|
||||
u.logout = lambda: None # type: ignore
|
||||
return u
|
||||
|
||||
|
||||
@ -99,15 +99,17 @@ def getIp(request):
|
||||
|
||||
|
||||
# Decorator to make easier protect pages that needs to be logged in
|
||||
def webLoginRequired(admin: bool = False):
|
||||
def webLoginRequired(admin: typing.Union[bool, str] = False):
|
||||
"""
|
||||
Decorator to set protection to access page
|
||||
Look for samples at uds.core.web.views
|
||||
if admin == True, needs admin or staff
|
||||
if admin == 'admin', needs admin
|
||||
"""
|
||||
|
||||
def decorator(view_func):
|
||||
def decorator(view_func: typing.Callable):
|
||||
@wraps(view_func, assigned=available_attrs(view_func))
|
||||
def _wrapped_view(request, *args, **kwargs):
|
||||
def _wrapped_view(request: HttpRequest, *args, **kwargs):
|
||||
"""
|
||||
Wrapped function for decorator
|
||||
"""
|
||||
@ -130,14 +132,14 @@ def webLoginRequired(admin: bool = False):
|
||||
|
||||
|
||||
# Decorator to protect pages that needs to be accessed from "trusted sites"
|
||||
def trustedSourceRequired(view_func):
|
||||
def trustedSourceRequired(view_func: typing.Callable):
|
||||
"""
|
||||
Decorator to set protection to access page
|
||||
look for sample at uds.dispatchers.pam
|
||||
"""
|
||||
|
||||
@wraps(view_func)
|
||||
def _wrapped_view(request, *args, **kwargs):
|
||||
def _wrapped_view(request: HttpRequest, *args, **kwargs):
|
||||
"""
|
||||
Wrapped function for decorator
|
||||
"""
|
||||
@ -150,10 +152,10 @@ def trustedSourceRequired(view_func):
|
||||
|
||||
|
||||
# decorator to deny non authenticated requests
|
||||
def denyNonAuthenticated(view_func):
|
||||
def denyNonAuthenticated(view_func: typing.Callable):
|
||||
|
||||
@wraps(view_func)
|
||||
def _wrapped_view(request, *args, **kwargs):
|
||||
def _wrapped_view(request: HttpRequest, *args, **kwargs):
|
||||
if request.user is None:
|
||||
return HttpResponseForbidden()
|
||||
return view_func(request, *args, **kwargs)
|
||||
|
@ -117,7 +117,7 @@ class gui:
|
||||
return []
|
||||
|
||||
@staticmethod
|
||||
def choiceItem(id_: str, text: str) -> gui.ChoiceType:
|
||||
def choiceItem(id_: str, text: str) -> 'gui.ChoiceType':
|
||||
"""
|
||||
Helper method to create a single choice item.
|
||||
|
||||
|
@ -32,6 +32,9 @@
|
||||
import threading
|
||||
import logging
|
||||
import typing
|
||||
import datetime
|
||||
|
||||
from django.http import HttpRequest, HttpResponse
|
||||
|
||||
from uds.core.util import OsDetector
|
||||
from uds.core.util.Config import GlobalConfig
|
||||
@ -41,25 +44,27 @@ from uds.models import User
|
||||
|
||||
logger = logging.getLogger(__name__)
|
||||
|
||||
_requests : typing.Dict[int, typing.Any] = {}
|
||||
_requests: typing.Dict[int, typing.Tuple[HttpRequest, datetime.datetime]] = {}
|
||||
|
||||
|
||||
def getIdent() -> typing.Optional[int]:
|
||||
return threading.current_thread().ident
|
||||
def getIdent() -> int:
|
||||
ident = threading.current_thread().ident
|
||||
return ident if ident else -1
|
||||
|
||||
|
||||
def getRequest():
|
||||
def getRequest() -> HttpRequest:
|
||||
ident = getIdent()
|
||||
if ident in _requests:
|
||||
return _requests[ident]
|
||||
return {}
|
||||
return _requests[ident][0]
|
||||
|
||||
return HttpRequest()
|
||||
|
||||
|
||||
class GlobalRequestMiddleware:
|
||||
def __init__(self, get_response):
|
||||
self.get_response = get_response
|
||||
def __init__(self, get_response: typing.Callable[[HttpRequest], HttpResponse]):
|
||||
self._get_response: typing.Callable[[HttpRequest], HttpResponse] = get_response
|
||||
|
||||
def _process_request(self, request):
|
||||
def _process_request(self, request: HttpRequest) -> None:
|
||||
# Add IP to request
|
||||
GlobalRequestMiddleware.fillIps(request)
|
||||
# Ensures request contains os
|
||||
@ -68,10 +73,10 @@ class GlobalRequestMiddleware:
|
||||
GlobalRequestMiddleware.getUser(request)
|
||||
|
||||
# Store request on cache
|
||||
_requests[getIdent()] = request
|
||||
_requests[getIdent()] = (request, datetime.datetime.now())
|
||||
|
||||
|
||||
def _process_response(self, request, response):
|
||||
def _process_response(self, request: HttpRequest, response: HttpResponse):
|
||||
# Remove IP from global cache (processing responses after this will make global request unavailable,
|
||||
# but can be got from request again)
|
||||
ident = getIdent()
|
||||
@ -85,15 +90,15 @@ class GlobalRequestMiddleware:
|
||||
logger.exception('Deleting stored request')
|
||||
return response
|
||||
|
||||
def __call__(self, request):
|
||||
def __call__(self, request: HttpRequest):
|
||||
self._process_request(request)
|
||||
|
||||
response = self.get_response(request)
|
||||
response = self._get_response(request)
|
||||
|
||||
return self._process_response(request, response)
|
||||
|
||||
@staticmethod
|
||||
def fillIps(request):
|
||||
def fillIps(request: HttpRequest):
|
||||
"""
|
||||
Obtains the IP of a Django Request, even behind a proxy
|
||||
|
||||
@ -118,19 +123,20 @@ class GlobalRequestMiddleware:
|
||||
request.ip_proxy = request.ip
|
||||
|
||||
@staticmethod
|
||||
def getUser(request):
|
||||
def getUser(request: HttpRequest)-> None:
|
||||
"""
|
||||
Ensures request user is the correct user
|
||||
"""
|
||||
logger.debug('Getting User on Middleware')
|
||||
user = request.session.get(USER_KEY)
|
||||
if user:
|
||||
user_id: str = request.session.get(USER_KEY)
|
||||
user: typing.Optional[User] = None
|
||||
if user_id:
|
||||
try:
|
||||
if user == ROOT_ID:
|
||||
if user_id == ROOT_ID:
|
||||
user = getRootUser()
|
||||
else:
|
||||
user = User.objects.get(pk=user)
|
||||
user = User.objects.get(pk=user_id)
|
||||
except User.DoesNotExist:
|
||||
user = None
|
||||
|
||||
logger.debug('User at Middleware: %s %s', user_id, user)
|
||||
request.user = user
|
||||
|
Loading…
Reference in New Issue
Block a user