From d0e9c96573907ea59ce72bf7169703f9c777df2d Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Adolfo=20G=C3=B3mez=20Garc=C3=ADa?= Date: Thu, 25 Aug 2022 22:28:57 +0200 Subject: [PATCH] Added new test and make pytests use parallel execution of tests using pytest-xdist --- server/src/pytest.ini | 2 +- server/src/tests/.gitignore | 1 + server/src/tests/core/managers/__init__.py | 33 +++++ server/src/tests/core/managers/test_crypto.py | 122 ++++++++++++++++++ server/src/tests/utils/test.py | 16 ++- server/src/tests/utils/web/test.py | 5 +- .../src/tests/web/user/test_login_logout.py | 4 +- 7 files changed, 175 insertions(+), 8 deletions(-) create mode 100644 server/src/tests/.gitignore create mode 100644 server/src/tests/core/managers/test_crypto.py diff --git a/server/src/pytest.ini b/server/src/pytest.ini index 829a3771..09a0a710 100644 --- a/server/src/pytest.ini +++ b/server/src/pytest.ini @@ -1,4 +1,4 @@ [pytest] DJANGO_SETTINGS_MODULE = server.settings python_files = tests.py test_*.py *_tests.py -addopts = --cov --cov-report html --cov-config=coverage.ini +addopts = --cov --cov-report html --cov-config=coverage.ini -n 12 diff --git a/server/src/tests/.gitignore b/server/src/tests/.gitignore new file mode 100644 index 00000000..a1302fbe --- /dev/null +++ b/server/src/tests/.gitignore @@ -0,0 +1 @@ +/enterprise diff --git a/server/src/tests/core/managers/__init__.py b/server/src/tests/core/managers/__init__.py index e69de29b..3e05ba69 100644 --- a/server/src/tests/core/managers/__init__.py +++ b/server/src/tests/core/managers/__init__.py @@ -0,0 +1,33 @@ +# -*- coding: utf-8 -*- +# +# Copyright (c) 2022 Virtual Cable S.L.U. +# All rights reserved. +# +# Redistribution and use in source and binary forms, with or without modification, +# are permitted provided that the following conditions are met: +# +# * Redistributions of source code must retain the above copyright notice, +# this list of conditions and the following disclaimer. +# * Redistributions in binary form must reproduce the above copyright notice, +# this list of conditions and the following disclaimer in the documentation +# and/or other materials provided with the distribution. +# * Neither the name of Virtual Cable S.L.U. nor the names of its contributors +# may be used to endorse or promote products derived from this software +# without specific prior written permission. +# +# THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS IS" +# AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE +# IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE ARE +# DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT HOLDER OR CONTRIBUTORS BE LIABLE +# FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR CONSEQUENTIAL +# DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR +# SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER +# CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, +# OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE +# OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE. +""" +@author: Adolfo Gómez, dkmaster at dkmon dot com +""" + +from . import test_crypto +from . import test_downloads diff --git a/server/src/tests/core/managers/test_crypto.py b/server/src/tests/core/managers/test_crypto.py new file mode 100644 index 00000000..dd91a9a8 --- /dev/null +++ b/server/src/tests/core/managers/test_crypto.py @@ -0,0 +1,122 @@ +# -*- coding: utf-8 -*- + +# +# Copyright (c) 2022 Virtual Cable S.L.U. +# All rights reserved. +# +# Redistribution and use in source and binary forms, with or without modification, +# are permitted provided that the following conditions are met: +# +# * Redistributions of source code must retain the above copyright notice, +# this list of conditions and the following disclaimer. +# * Redistributions in binary form must reproduce the above copyright notice, +# this list of conditions and the following disclaimer in the documentation +# and/or other materials provided with the distribution. +# * Neither the name of Virtual Cable S.L.U. nor the names of its contributors +# may be used to endorse or promote products derived from this software +# without specific prior written permission. +# +# THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS IS" +# AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE +# IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE ARE +# DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT HOLDER OR CONTRIBUTORS BE LIABLE +# FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR CONSEQUENTIAL +# DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR +# SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER +# CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, +# OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE +# OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE. + +""" +@author: Adolfo Gómez, dkmaster at dkmon dot com +""" +import typing +import datetime + +from django.conf import settings + +from uds.core.managers.crypto import CryptoManager +from ...utils.test import UDSTestCase + +class CryptoManagerTests(UDSTestCase): + manager: CryptoManager = CryptoManager() + + + def test_RSA(self): + testStr = 'Test string' + cryptStr = self.manager.encrypt(testStr) + + self.assertIsInstance(cryptStr, str, 'Crypted string is not unicode') + + decryptStr = self.manager.decrypt(cryptStr) + + self.assertIsInstance(decryptStr, str, 'Decrypted string is not unicode') + self.assertEqual( + decryptStr, + testStr, + 'Decrypted test string failed!: {} vs {}'.format(decryptStr, testStr), + ) + + def test_Xor(self): + testStr1a = 'Test String more or less with strange chars €@"áéöüìùòàäñÑ@æßðđŋħ←↓→þøŧ¶€ł@łĸµn”“«»“”nµłĸŋđðßææ@ł€¶ŧ←↓→øþ' + testStr1b = 'Test String 2 with some ł€¶ŧ←↓→øþ' + + testStr2a = 'xor string chasquera' + testStr2b = 'xor string chasquera #~½¬æßð' + + for s1 in (testStr1a, testStr1b): + for s2 in (testStr2a, testStr2b): + xor = self.manager.xor(s1, s2) + self.assertIsInstance(xor, bytes, 'Returned xor string is not bytes') + xorxor = self.manager.xor(xor, s2) + self.assertEqual(xorxor.decode('utf-8'), s1) + + def test_Symcrypt(self): + testStr1a = 'Test String more or less with strange chars €@"áéöüìùòàäñÑ@æßðđŋħ←↓→þøŧ¶€ł@łĸµn”“«»“”nµłĸŋđðßææ@ł€¶ŧ←↓→øþ' + testStr1b = 'Test String 2 with some ł€¶ŧ←↓→øþ' + + testStr2a = 'xor string chasquera' + testStr2b = 'xor string chasquera #~½¬æßð' + + for s1 in (testStr1a, testStr1b): + for s2 in (testStr2a, testStr2b): + sym = self.manager.symCrypt(s1, s2) + self.assertIsInstance(sym, bytes, 'Returned xor string is not bytes') + symd = self.manager.symDecrpyt(sym, s2) + self.assertEqual(symd, s1) + + + def test_Certs(self): + # Right now, only tests that these methods do not fails + self.manager.loadPrivateKey(settings.RSA_KEY) + + self.manager.loadCertificate(settings.CERTIFICATE) + self.manager.loadCertificate(settings.CERTIFICATE.encode('utf8')) + + def test_Hash(self): + testStr = 'Test String for hash' + # knownHashValue = '4e1311c1378993b34430988f4836b8e6b8beb219' + + for _ in (testStr, testStr.encode('utf-8')): + hashValue = self.manager.hash(testStr) + self.assertIsInstance(hashValue, str, 'Returned hash must be an string') + + def test_Uuid(self): + self.manager._counter = 0 + self.assertIsInstance(self.manager.uuid(), str) + self.assertEqual(1, self.manager._counter, 'Counter has note been incremented!') + + for o in ( + (1, '47c69004-5f4c-5266-b93d-747b318e2d3f'), + (1.1, 'dfdae060-00a9-5e8d-9a28-3b77b8af18eb'), + ('Test String', 'dce56818-2231-5d0f-abd3-73b3b8c1c7ee'), + ( + datetime.datetime(2014, 9, 15, 17, 2, 12), + 'a42521d7-2b2f-5767-992c-482aef05b25c', + ), + ): + uuid = self.manager.uuid(o[0]) + self.assertIsInstance(uuid, str, 'Returned uuid must be an string') + self.assertEqual(uuid, o[1]) + + self.assertEqual(1, self.manager._counter, 'Counter has note been incremented!') diff --git a/server/src/tests/utils/test.py b/server/src/tests/utils/test.py index bcccf8b2..f1f7ac6a 100644 --- a/server/src/tests/utils/test.py +++ b/server/src/tests/utils/test.py @@ -43,25 +43,30 @@ from uds.core.managers.crypto import CryptoManager logger = logging.getLogger(__name__) + class UDSHttpResponse(HttpResponse): """ Custom response class to be able to access the response content """ + def __init__(self, content, *args, **kwargs): super().__init__(content, *args, **kwargs) self.content = content - def json(self) -> typing.Any: return super().json() # type: ignore + class UDSClient(Client): headers: typing.Dict[str, str] = { 'HTTP_USER_AGENT': 'Testing user agent', } def __init__( - self, enforce_csrf_checks: bool =False, raise_request_exception: bool=True, **defaults: typing.Any + self, + enforce_csrf_checks: bool = False, + raise_request_exception: bool = True, + **defaults: typing.Any ): # Ensure only basic middleware are enabled. settings.MIDDLEWARE = [ @@ -73,8 +78,13 @@ class UDSClient(Client): 'uds.core.util.middleware.request.GlobalRequestMiddleware', ] + # Update settings security options + settings.RSA_KEY = '-----BEGIN PRIVATE KEY-----\nMIIEvwIBADANBgkqhkiG9w0BAQEFAASCBKkwggSlAgEAAoIBAQDcANi/08cnpn04\njKW/o2G1k4SIa6dJks8DmT4MQHOWqYC46YSIIPzqGoBPcvkbDSPSFBnByo3HhMY+\nk4JHc9SUwEmHSJWDCHjt7XSXX/ryqH0QQIJtSjk9Bc+GkOU24mnbITiw7ORjp7VN\nvgFdFhjVsZM/NjX/6Y9DoCPC1mGj0O9Dd4MfCsNwUxRhhR6LdrEnpRUSVW0Ksxbz\ncTfpQjdFr86+1BeUbzqN2HDcEGhvioj+0lGPXcOZoRNYU16H7kjLNP+o+rC7f/q/\nfoOYLzDSkmzePbcG+g0Hv7K7fuLus05ZWjupOmJA9hytB1BIF4p5f4ewl05Fx2Zj\nG2LneO2fAgMBAAECggEBANDimOnh2TkDceeMWx+OsAooC3E/zbEkjBudl3UoiNcn\nD0oCpkxeDeT0zpkgz/ZoTnd7kE0Y1e73WQc3JT5UcyXdQLMLLrIgDDnT+Jx1jB5z\n7XLN3UiJbblL2BOrZYbsCJf/fgU2l08rgBBVdJP+lAvps6YUAcd+6gDKfsnSpRhU\nWBHLZde7l6vUJ2OK9ZmHaghF5E8Xx918OSUKFJfGTYL5JLTb/scdl8vQse1quWC1\nk48PPXK10vOFvYWonQpRb2cOK/PPjPXPNWzcQyQY9D1iOeFvRyLqOXYE/ZY+qDe2\nHdPGrkl67yz01nzepkWWg/ZNbMXeZZyOnZm0aXtOxtkCgYEA/Qz3mescgwrt67yh\nFrbXjUqiVf2IpbNt88CUcbY0r1EdTA9OMtOtPYNvfpyRIRfDaZJ1zAdh3CZ2/hTm\ng+VUtseKnUDCi0xIBKX3V2O8sryWt2KStTnTo6JP0T47yXvmaRu5cutgoaD9SK+r\nN5vg1D2gNLmsT8uJh1Bl/yWGC4sCgYEA3pFGgAmiywsvmsddkI+LujoQVTiqkfFg\nMHHsJFOZlhYO83g49Q11pcQ70ukT6e89Ggwy///+z19p8jJ+wGqQWQLsM6eO1utg\nnJ8wMTwk8tOEm9MnWnnWhtG9KWcgkmwOVQiesJdWa1xOqsBKGchUkugmFycKNsiG\nHUbogbJ0OL0CgYBVLIcuxKdNKGGaxlwGVDbLdQKdJQBYncN1ly2f9K9ZD1loH4K3\nsu4N1W6y1Co5VFFO+KAzs4xp2HyW2xwX6xoPh6yNb53L2zombmKJhKWgF8A3K7Or\n0jH9UwXArUzcbZrJaC6MktNss85tJ8vepNYROkjxVkm8dgrtg89BCTVMLwKBgQCW\nSSh+uoL3cdUyQV63h4ZFOIHg2cOrin52F+bpXJ3/z2NHGa30IqOHTGtM7l+o/geX\nOBeT72tC4d2rUlduXEaeJDAUbRcxnnx9JayoAkG8ygDoK3uOR2kJXkTJ2T4QQPCo\nkIp/GaGcGxdviyo+IJyjGijmR1FJTrvotwG22iZKTQKBgQCIh50Dz0/rqZB4Om5g\nLLdZn1C8/lOR8hdK9WUyPHZfJKpQaDOlNdiy9x6xD6+uIQlbNsJhlDbOudHDurfI\nghGbJ1sy1FUloP+V3JAFS88zIwrddcGEso8YMFMCE1fH2/q35XGwZEnUq7ttDaxx\nHmTQ2w37WASIUgCl2GhM25np0Q==\n-----END PRIVATE KEY-----\n' + settings.CERTIFICATE = '-----BEGIN CERTIFICATE-----\nMIICzTCCAjYCCQCOUQEWpuEa3jANBgkqhkiG9w0BAQUFADCBqjELMAkGA1UEBhMC\nRVMxDzANBgNVBAgMBk1hZHJpZDEUMBIGA1UEBwwLQWxjb3Jjw4PCs24xHTAbBgNV\nBAoMFFZpcnR1YWwgQ2FibGUgUy5MLlUuMRQwEgYDVQQLDAtEZXZlbG9wbWVudDEY\nMBYGA1UEAwwPQWRvbGZvIEfDg8KzbWV6MSUwIwYJKoZIhvcNAQkBFhZhZ29tZXpA\ndmlydHVhbGNhYmxlLmVzMB4XDTEyMDYyNTA0MjM0MloXDTEzMDYyNTA0MjM0Mlow\ngaoxCzAJBgNVBAYTAkVTMQ8wDQYDVQQIDAZNYWRyaWQxFDASBgNVBAcMC0FsY29y\nY8ODwrNuMR0wGwYDVQQKDBRWaXJ0dWFsIENhYmxlIFMuTC5VLjEUMBIGA1UECwwL\nRGV2ZWxvcG1lbnQxGDAWBgNVBAMMD0Fkb2xmbyBHw4PCs21lejElMCMGCSqGSIb3\nDQEJARYWYWdvbWV6QHZpcnR1YWxjYWJsZS5lczCBnzANBgkqhkiG9w0BAQEFAAOB\njQAwgYkCgYEA35iGyHS/GVdWk3n9kQ+wsCLR++jd9Vez/s407/natm8YDteKksA0\nMwIvDAX722blm8PUya2NOlnum8KdyUPDOq825XERDlsIA+sTd6lb1c7w44qZ/pb+\n68mhXoRx2VJsu//+zhBkaQ1/KcugeHa4WLRIH35YLxdQDxrXS1eQWccCAwEAATAN\nBgkqhkiG9w0BAQUFAAOBgQAk+fJPpY+XvUsxR2A4SaQ8TGnE2x4PtpwCrCVzKEU9\nW2ugdXvysxkHbib3+JdA6s+lJjHs5HiMZPo/ak8adEKke+d10EU5YcUaJRRUpStY\nqQHziaqOl5Hgi75Kjskq6+tCU0Iui+s9pBg0V6y1AQsCmH2xFs7t1oEOGRFVarfF\n4Q==\n-----END CERTIFICATE-----' + # Instantiate the client and add basic user agent super().__init__(enforce_csrf_checks, raise_request_exception) + # and required UDS cookie self.cookies['uds'] = CryptoManager().randomString(48) @@ -88,7 +98,6 @@ class UDSClient(Client): request.update(self.headers) return super().request(**request) - def get(self, *args, **kwargs) -> 'UDSHttpResponse': return typing.cast('UDSHttpResponse', super().get(*args, **kwargs)) @@ -101,6 +110,7 @@ class UDSTestCase(TestCase): client: UDSClient + class UDSTransactionTestCase(TransactionTestCase): client_class: typing.Type = UDSClient diff --git a/server/src/tests/utils/web/test.py b/server/src/tests/utils/web/test.py index d674d60a..505ce316 100644 --- a/server/src/tests/utils/web/test.py +++ b/server/src/tests/utils/web/test.py @@ -76,7 +76,7 @@ class WEBTestCase(test.UDSTransactionTestCase): self.provider = fixtures.services.createProvider() - def do_login(self, username: str, password: str, authid: str) -> 'test.UDSHttpResponse': + def do_login(self, username: str, password: str, authid: str, check: bool = False) -> 'test.UDSHttpResponse': response = typing.cast( 'test.UDSHttpResponse', self.client.post( @@ -88,7 +88,8 @@ class WEBTestCase(test.UDSTransactionTestCase): }, ), ) - self.assertRedirects(response, '/uds/page/services', status_code=302, target_status_code=200) + if check: + self.assertRedirects(response, '/uds/page/services', status_code=302, target_status_code=200) return response def login( diff --git a/server/src/tests/web/user/test_login_logout.py b/server/src/tests/web/user/test_login_logout.py index 9e410da0..2765ddd2 100644 --- a/server/src/tests/web/user/test_login_logout.py +++ b/server/src/tests/web/user/test_login_logout.py @@ -102,7 +102,7 @@ class WebLoginLogout(test.WEBTestCase): # Ensure web login for super user is disabled and that the root login fails GlobalConfig.SUPER_USER_ALLOW_WEBACCESS.set(False) - response = self.do_login(root, rootpass, auth.uuid) + response = self.do_login(root, rootpass, auth.uuid, False) self.assertInvalidLogin(response) # Esure invalid password for root user is not allowed @@ -129,7 +129,7 @@ class WebLoginLogout(test.WEBTestCase): is_staff=True, )[0] - response = self.do_login(user.name, user.name, user.manager.uuid) + response = self.do_login(user.name, user.name, user.manager.uuid, False) self.assertInvalidLogin(response) self.assertEqual(models.Log.objects.count(), 4)