1
0
mirror of https://github.com/dkmstr/openuds.git synced 2025-01-24 02:04:09 +03:00

Added tests for UDS middlewares

This commit is contained in:
Adolfo Gómez García 2022-12-09 02:58:17 +01:00
parent e292e726d3
commit 4d38b61abc
No known key found for this signature in database
GPG Key ID: DD1ABF20724CDA23
9 changed files with 390 additions and 27 deletions

View File

@ -32,11 +32,8 @@ import typing
import functools
import logging
from django.conf import settings
from uds import models
from uds.core import VERSION
from uds.REST.handlers import AUTH_TOKEN_HEADER
from ...utils import rest
@ -107,7 +104,7 @@ class ActorInitializeTest(rest.test.RESTActorTestCase):
unique_id = self.user_service_managed.getUniqueId()
success = functools.partial(self.invoke_success, 'managed', actor_token)
failure = functools.partial(self.invoke_failure, 'managed')
failure = functools.partial(self.invoke_failure, 'managed')
result = success(unique_id)

View File

@ -0,0 +1,167 @@
# -*- coding: utf-8 -*-
#
# Copyright (c) 2022 Virtual Cable S.L.U.
# All rights reserved.
#
# Redistribution and use in source and binary forms, with or without modification,
# are permitted provided that the following conditions are met:
#
# * Redistributions of source code must retain the above copyright notice,
# this list of conditions and the following disclaimer.
# * Redistributions in binary form must reproduce the above copyright notice,
# this list of conditions and the following disclaimer in the documentation
# and/or other materials provided with the distribution.
# * Neither the name of Virtual Cable S.L.U. nor the names of its contributors
# may be used to endorse or promote products derived from this software
# without specific prior written permission.
#
# THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS IS"
# AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE
# IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE ARE
# DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT HOLDER OR CONTRIBUTORS BE LIABLE
# FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR CONSEQUENTIAL
# DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR
# SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER
# CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY,
# OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE
# OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
"""
@author: Adolfo Gómez, dkmaster at dkmon dot com
"""
import typing
import logging
from django.urls import reverse
from uds.core.util import config
from uds.core.auths.auth import AUTHORIZED_KEY
from ....utils.web import test
if typing.TYPE_CHECKING:
from uds.core.util.request import ExtendedHttpRequestWithUser
logger = logging.getLogger(__name__)
class GlobalRequestMiddlewareTest(test.WEBTestCase):
"""
Test actor functionality
"""
def test_global_request_no_login_ipv4(self) -> None:
GlobalRequestMiddlewareTest.add_middleware(
'uds.core.util.middleware.request.GlobalRequestMiddleware'
)
self.client.enable_ipv4()
response = self.client.get('/', secure=False)
request = typing.cast('ExtendedHttpRequestWithUser', response.wsgi_request)
# session[AUTHORIZED_KEY] = False, not logged in
self.assertEqual(request.session.get(AUTHORIZED_KEY), False)
# Ensure ip, and ip_proxy are set and both are the same, 127.0.0.1
self.assertEqual(request.ip, '127.0.0.1')
self.assertEqual(request.ip_proxy, '127.0.0.1')
self.assertEqual(request.ip_version, 4)
# Ensure user is not set
self.assertEqual(request.user, None)
# And redirects to index
self.assertEqual(response.status_code, 302)
self.assertEqual(response.url, reverse('page.index'))
def test_global_request_no_login_ipv6(self) -> None:
GlobalRequestMiddlewareTest.add_middleware(
'uds.core.util.middleware.request.GlobalRequestMiddleware'
)
self.client.enable_ipv6()
response = self.client.get('/', secure=False)
request = typing.cast('ExtendedHttpRequestWithUser', response.wsgi_request)
# session[AUTHORIZED_KEY] = False, not logged in
self.assertEqual(request.session.get(AUTHORIZED_KEY), False)
# Ensure ip, and ip_proxy are set and both are the same,
self.assertEqual(request.ip, '::1')
self.assertEqual(request.ip_proxy, '::1')
self.assertEqual(request.ip_version, 6)
# Ensure user is not set
self.assertEqual(request.user, None)
# And redirects to index
self.assertEqual(response.status_code, 302)
self.assertEqual(response.url, reverse('page.index'))
def test_global_request_login_ipv4(self) -> None:
GlobalRequestMiddlewareTest.add_middleware(
'uds.core.util.middleware.request.GlobalRequestMiddleware'
)
self.client.enable_ipv4()
user = self.login(as_admin=False)
response = self.client.get('/', secure=False)
request = typing.cast('ExtendedHttpRequestWithUser', response.wsgi_request)
# session[AUTHORIZED_KEY] = True, logged in
self.assertEqual(request.session.get(AUTHORIZED_KEY), True)
# Ensure ip, and ip_proxy are set and both are the same,
self.assertEqual(request.ip, '127.0.0.1')
self.assertEqual(request.ip_proxy, '127.0.0.1')
self.assertEqual(request.ip_version, 4)
# Ensure user is correct
self.assertEqual(request.user.uuid, user.uuid)
# And redirects to index
self.assertEqual(response.status_code, 302)
self.assertEqual(response.url, reverse('page.index'))
def test_global_request_login_ipv6(self) -> None:
GlobalRequestMiddlewareTest.add_middleware(
'uds.core.util.middleware.request.GlobalRequestMiddleware'
)
self.client.enable_ipv6()
user = self.login(as_admin=False)
response = self.client.get('/', secure=False)
request = typing.cast('ExtendedHttpRequestWithUser', response.wsgi_request)
# session[AUTHORIZED_KEY] = True, logged in
self.assertEqual(request.session.get(AUTHORIZED_KEY), True)
# Ensure ip, and ip_proxy are set and both are the same,
self.assertEqual(request.ip, '::1')
self.assertEqual(request.ip_proxy, '::1')
self.assertEqual(request.ip_version, 6)
# Ensure user is correct
self.assertEqual(request.user.uuid, user.uuid)
# And redirects to index
self.assertEqual(response.status_code, 302)
self.assertEqual(response.url, reverse('page.index'))
def test_no_middleware(self) -> None:
# Ensure GlobalRequestMiddleware is not present
GlobalRequestMiddlewareTest.remove_middleware('uds.core.util.middleware.request.GlobalRequestMiddleware')
self.client.enable_ipv4()
response = self.client.get('/', secure=False)
request = response.wsgi_request
# session[AUTHORIZED_KEY] is not present
self.assertEqual(AUTHORIZED_KEY in request.session, False)
# ip is not present, nor ip_proxy or ip_version
self.assertEqual(hasattr(request, 'ip'), False)
self.assertEqual(hasattr(request, 'ip_proxy'), False)
self.assertEqual(hasattr(request, 'ip_version'), False)
# Also, user is not present
self.assertEqual(hasattr(request, 'user'), False)
# And redirects to index
self.assertEqual(response.status_code, 302)
self.assertEqual(response.url, reverse('page.index'))

View File

@ -0,0 +1,82 @@
# -*- coding: utf-8 -*-
#
# Copyright (c) 2022 Virtual Cable S.L.U.
# All rights reserved.
#
# Redistribution and use in source and binary forms, with or without modification,
# are permitted provided that the following conditions are met:
#
# * Redistributions of source code must retain the above copyright notice,
# this list of conditions and the following disclaimer.
# * Redistributions in binary form must reproduce the above copyright notice,
# this list of conditions and the following disclaimer in the documentation
# and/or other materials provided with the distribution.
# * Neither the name of Virtual Cable S.L.U. nor the names of its contributors
# may be used to endorse or promote products derived from this software
# without specific prior written permission.
#
# THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS IS"
# AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE
# IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE ARE
# DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT HOLDER OR CONTRIBUTORS BE LIABLE
# FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR CONSEQUENTIAL
# DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR
# SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER
# CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY,
# OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE
# OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
"""
@author: Adolfo Gómez, dkmaster at dkmon dot com
"""
import typing
import logging
from django.urls import reverse
from uds.core.util import config
from uds.core.managers.crypto import CryptoManager
from uds.core.util.middleware.redirect import _NO_REDIRECT
from ....utils import test
logger = logging.getLogger(__name__)
class RedirectMiddlewareTest(test.UDSTransactionTestCase):
"""
Test actor functionality
"""
def test_redirect(self):
RedirectMiddlewareTest.add_middleware('uds.core.util.middleware.redirect.RedirectMiddleware')
config.GlobalConfig.REDIRECT_TO_HTTPS.set(True)
response = self.client.get('/', secure=False)
self.assertEqual(response.status_code, 302)
self.assertEqual(response.url, 'https://testserver/')
# Try secure, will redirect to index
response = self.client.get('/', secure=True)
self.assertEqual(response.status_code, 302)
self.assertEqual(response.url, reverse('page.index'))
# Try several urls, random. Unsecure will redirect, secure will not
for _ in range(32):
url = f'/{CryptoManager().randomString(32)}'
response = self.client.get(url, secure=False)
self.assertEqual(response.status_code, 302)
self.assertEqual(response.url, f'https://testserver{url}')
response = self.client.get(url, secure=True)
self.assertEqual(response.status_code, 404) # Not found
# These urls will never redirect:
for url in _NO_REDIRECT:
# Append some random string to avoid cache and make a 404 occur
url = f'/{url}/{CryptoManager().randomString(32)}'
response = self.client.get(url, secure=False)
# every url will return 404, except /uds/rest/client that will return 400 and wyse or servlet that will return 302
if url.startswith('/uds/rest/client'):
self.assertEqual(response.status_code, 400)
elif url.startswith('/wyse') or url.startswith('/servlet'):
self.assertEqual(response.status_code, 302)
else:
self.assertEqual(response.status_code, 404)

View File

@ -0,0 +1,95 @@
# -*- coding: utf-8 -*-
#
# Copyright (c) 2022 Virtual Cable S.L.U.
# All rights reserved.
#
# Redistribution and use in source and binary forms, with or without modification,
# are permitted provided that the following conditions are met:
#
# * Redistributions of source code must retain the above copyright notice,
# this list of conditions and the following disclaimer.
# * Redistributions in binary form must reproduce the above copyright notice,
# this list of conditions and the following disclaimer in the documentation
# and/or other materials provided with the distribution.
# * Neither the name of Virtual Cable S.L.U. nor the names of its contributors
# may be used to endorse or promote products derived from this software
# without specific prior written permission.
#
# THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS IS"
# AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE
# IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE ARE
# DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT HOLDER OR CONTRIBUTORS BE LIABLE
# FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR CONSEQUENTIAL
# DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR
# SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER
# CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY,
# OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE
# OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
"""
@author: Adolfo Gómez, dkmaster at dkmon dot com
"""
import logging
from django.urls import reverse
from uds.core.util import config
from uds.core.util.middleware.redirect import _NO_REDIRECT
from ....utils import test
logger = logging.getLogger(__name__)
class SecurityMiddlewareTest(test.UDSTransactionTestCase):
"""
Test actor functionality
"""
def test_security(self) -> None:
SecurityMiddlewareTest.add_middleware('uds.core.util.middleware.security.UDSSecurityMiddleware')
# No trusted sources
config.GlobalConfig.TRUSTED_SOURCES.set('')
# Without user agent, security middleware will return forbidden (403) if not Trusted IP
# With user agent, it will process normally (/ will redirect to index, for example, and index will return 200)
# If user agent contains "bot" or "spider" it will return 403 in all cases
response = self.client.get('/', secure=False)
self.assertEqual(response.status_code, 302)
self.assertEqual(response.url, reverse('page.index'))
# Try secure, will redirect to index also
response = self.client.get('/', secure=True)
self.assertEqual(response.status_code, 302)
self.assertEqual(response.url, reverse('page.index'))
# Remove user agent, will return 403
self.client.set_user_agent(None)
response = self.client.get('/', secure=False)
self.assertEqual(response.status_code, 403)
response = self.client.get('/', secure=True)
self.assertEqual(response.status_code, 403)
# Bots also are denied
self.client.set_user_agent('Mozilla/5.0 (compatible; Googlebot/2.1; +http://www.google.com/bot.html)')
response = self.client.get('/', secure=False)
self.assertEqual(response.status_code, 403)
response = self.client.get('/', secure=True)
self.assertEqual(response.status_code, 403)
# Add trusted ip, 127.0.0.1
config.GlobalConfig.TRUSTED_SOURCES.set('127.0.0.1')
# Bot will be denied anyway, even if trusted source
response = self.client.get('/', secure=False)
self.assertEqual(response.status_code, 403)
response = self.client.get('/', secure=True)
self.assertEqual(response.status_code, 403)
# Emtpy user agent will be allowed from trusted source
self.client.set_user_agent(None)
response = self.client.get('/', secure=False)
self.assertEqual(response.status_code, 302)
self.assertEqual(response.url, reverse('page.index'))
response = self.client.get('/', secure=True)
self.assertEqual(response.status_code, 302)
self.assertEqual(response.url, reverse('page.index'))

View File

@ -59,6 +59,7 @@ class UDSClientMixin:
headers: typing.Dict[str, str] = {
'HTTP_USER_AGENT': 'Testing user agent',
}
ip_version: int = 4
def initialize(self):
# Ensure only basic middleware are enabled.
@ -78,11 +79,23 @@ class UDSClientMixin:
def add_header(self, name: str, value: str):
self.headers[name] = value
# Use "BEFORE" first client use
def add_middelware(self, middleware: str) -> None:
if middleware not in settings.MIDDLEWARE:
settings.MIDDLEWARE.append(middleware)
def set_user_agent(self, user_agent: typing.Optional[str] = None):
user_agent = user_agent or ''
# Add 'HTTP_USER_AGENT' header
self.headers['HTTP_USER_AGENT'] = user_agent
def enable_ipv4(self):
self.ip_version = 4
def enable_ipv6(self):
self.ip_version = 6
def append_remote_addr(self, kwargs: typing.Dict[str, typing.Any]) -> None:
if self.ip_version == 4:
kwargs['REMOTE_ADDR'] = '127.0.0.1'
elif self.ip_version == 6:
kwargs['REMOTE_ADDR'] = '::1'
class UDSClient(UDSClientMixin, Client):
def __init__(
@ -107,13 +120,14 @@ class UDSClient(UDSClientMixin, Client):
return super().request(**request)
def get(self, *args, **kwargs) -> 'UDSHttpResponse':
self.append_remote_addr(kwargs)
return typing.cast('UDSHttpResponse', super().get(*args, **kwargs))
def post(self, *args, **kwargs) -> 'UDSHttpResponse':
self.append_remote_addr(kwargs)
return typing.cast('UDSHttpResponse', super().post(*args, **kwargs))
class UDSAsyncClient(UDSClientMixin, AsyncClient):
def __init__(
self,
@ -137,31 +151,42 @@ class UDSAsyncClient(UDSClientMixin, AsyncClient):
return await super().request(**request)
async def get(self, *args, **kwargs) -> 'UDSHttpResponse':
self.append_remote_addr(kwargs)
return typing.cast('UDSHttpResponse', await super().get(*args, **kwargs))
async def post(self, *args, **kwargs) -> 'UDSHttpResponse':
self.append_remote_addr(kwargs)
return typing.cast('UDSHttpResponse', await super().post(*args, **kwargs))
class UDSTestCase(TestCase):
class UDSTestCaseMixin:
client_class: typing.Type = UDSClient
async_client_class: typing.Type = UDSAsyncClient
client: UDSClient
async_client: UDSAsyncClient
@staticmethod
def add_middleware(middleware: str) -> None:
if middleware not in settings.MIDDLEWARE:
settings.MIDDLEWARE.append(middleware)
@staticmethod
def remove_middleware(middleware: str) -> None:
# Remove middleware from settings, if present
try:
settings.MIDDLEWARE.remove(middleware)
except ValueError:
pass # Not present
class UDSTestCase(UDSTestCaseMixin, TestCase):
@classmethod
def setUpClass(cls) -> None:
super().setUpClass()
setupClass(cls) # The one local to this module
class UDSTransactionTestCase(TransactionTestCase):
client_class: typing.Type = UDSClient
async_client_class: typing.Type = UDSAsyncClient
client: UDSClient
async_client: UDSAsyncClient
class UDSTransactionTestCase(UDSTestCaseMixin, TransactionTestCase):
@classmethod
def setUpClass(cls) -> None:

View File

@ -94,11 +94,12 @@ class WEBTestCase(test.UDSTransactionTestCase):
def login(
self, user: typing.Optional[models.User] = None, as_admin: bool = True
) -> None:
) -> models.User:
'''
Login as specified user or first admin
'''
user = user or (self.admins[0] if as_admin else self.staffs[0])
self.do_login(user.name, user.name, user.manager.uuid)
return user

View File

@ -28,8 +28,6 @@
"""
@author: Adolfo Gómez, dkmaster at dkmon dot com
"""
import json
import codecs
import logging
import typing

View File

@ -65,6 +65,9 @@ def registerException(path: str) -> None:
def _check_redirectable(request: 'HttpRequest') -> typing.Optional['HttpResponse']:
if GlobalConfig.REDIRECT_TO_HTTPS.getBool() is False or request.is_secure():
return None
full_path = request.get_full_path()
redirect = True
for nr in _NO_REDIRECT:
@ -72,13 +75,8 @@ def _check_redirectable(request: 'HttpRequest') -> typing.Optional['HttpResponse
redirect = False
break
if (
redirect
and not request.is_secure()
and GlobalConfig.REDIRECT_TO_HTTPS.getBool()
):
if redirect:
if request.method == 'POST':
# url = request.build_absolute_uri(GlobalConfig.LOGIN_URL.get())
url = reverse('page.login')
else:
url = request.build_absolute_uri(full_path)

View File

@ -52,7 +52,7 @@ bot = re.compile(r'bot|spider', re.IGNORECASE)
def _process_request(request: 'ExtendedHttpRequest') -> typing.Optional['HttpResponse']:
ua = request.META.get('HTTP_USER_AGENT', 'Unknown')
ua = request.META.get('HTTP_USER_AGENT', '') or 'Unknown'
# If bot, break now
if bot.search(ua) or (ua == 'Unknown' and not isTrustedSource(request.ip)):
# Return emty response if bot is detected