From 3bf7af2e3dbe4f844b4856b872af0f5cc0d938c4 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Adolfo=20G=C3=B3mez=20Garc=C3=ADa?= Date: Mon, 26 Feb 2024 05:50:36 +0100 Subject: [PATCH] Fixing typing also on tests --- server/src/uds/core/ui/user_interface.py | 2 +- server/tests/REST/actor/test_test.py | 19 +- server/tests/REST/servers/test_register.py | 4 +- server/tests/REST/test_system.py | 4 +- .../auths/simple_ldap/test_serialization.py | 6 +- server/tests/core/jobs/test_scheduler.py | 9 +- server/tests/core/managers/test_downloads.py | 4 +- server/tests/core/models/test_account.py | 7 +- server/tests/core/models/test_model_uuid.py | 2 +- server/tests/core/reports/test_graphs.py | 10 +- server/tests/core/ui/test_gui.py | 2 +- .../tests/core/ui/test_userinterface_data.py | 18 +- .../ui/test_userinterface_serialization.py | 14 +- .../tests/core/util/test_auto_serializable.py | 8 +- server/tests/core/util/test_cache.py | 2 +- server/tests/core/util/test_calendar.py | 10 +- server/tests/core/util/test_permissions.py | 280 ++++++------------ server/tests/core/util/test_storage.py | 8 +- server/tests/core/util/test_uniqueid.py | 78 ++--- .../core/workers/test_assigned_unused.py | 4 +- .../test_hanged_userservice_cleaner.py | 4 +- .../tests/core/workers/test_stuck_cleaner.py | 4 +- server/tests/middleware/test_redirect.py | 2 +- .../__init__.py | 0 .../test_serialization.py | 0 .../test_serialization_deployment.py | 6 +- .../xen/test_serialization_deployment.py | 6 +- server/tests/utils/rest/__init__.py | 6 +- server/tests/utils/rest/assertions.py | 8 +- server/tests/utils/test.py | 19 +- server/tests/utils/web/__init__.py | 2 +- server/tests/web/user/test_login_logout.py | 6 +- 32 files changed, 226 insertions(+), 328 deletions(-) rename server/tests/osmanagers/{windows_randompass_osmanager copy => windows_randompass_osmanager}/__init__.py (100%) rename server/tests/osmanagers/{windows_randompass_osmanager copy => windows_randompass_osmanager}/test_serialization.py (100%) diff --git a/server/src/uds/core/ui/user_interface.py b/server/src/uds/core/ui/user_interface.py index 510500638..f81716aa6 100644 --- a/server/src/uds/core/ui/user_interface.py +++ b/server/src/uds/core/ui/user_interface.py @@ -773,7 +773,7 @@ class gui: return self.as_date() @value.setter - def value(self, value: datetime.date) -> None: + def value(self, value: datetime.date|str) -> None: self._set_value(value) def gui_description(self) -> dict[str, typing.Any]: diff --git a/server/tests/REST/actor/test_test.py b/server/tests/REST/actor/test_test.py index 19fc0530c..3e6ea6e71 100644 --- a/server/tests/REST/actor/test_test.py +++ b/server/tests/REST/actor/test_test.py @@ -29,6 +29,7 @@ Author: Adolfo Gómez, dkmaster at dkmon dot com """ import logging +import typing from uds.core import consts from uds.core.consts.actor import UNMANAGED @@ -59,25 +60,25 @@ class ActorTestTest(rest.test.RESTActorTestCase): self.assertEqual(response.json()['error'], 'invalid token') # - success = lambda: self.client.post( + _success: typing.Callable[[], typing.Any] = lambda: self.client.post( '/uds/rest/actor/v3/test', data={'type': type_, 'token': token}, content_type='application/json', ) - invalid = lambda: self.client.post( + _invalid: typing.Callable[[], typing.Any] = lambda: self.client.post( '/uds/rest/actor/v3/test', data={'type': type_, 'token': 'invalid'}, content_type='application/json', ) # Invalid actor token also fails - response = invalid() + response = _invalid() self.assertEqual(response.status_code, 200) self.assertEqual(response.json()['error'], 'invalid token') # This one works - response = success() + response = _success() self.assertEqual(response.status_code, 200) self.assertEqual(response.json()['result'], 'ok') @@ -85,20 +86,20 @@ class ActorTestTest(rest.test.RESTActorTestCase): # And this one too, without authentication token # Without header, test will success because its not authenticated self.client.add_header(consts.auth.AUTH_TOKEN_HEADER, 'invalid') - response = success() + response = _success() self.assertEqual(response.status_code, 200) self.assertEqual(response.json()['result'], 'ok') # We have ALLOWED_FAILS until we get blocked for a while # Next one will give 403 - for a in range(consts.system.ALLOWED_FAILS): - response = invalid() + for _i in range(consts.system.ALLOWED_FAILS): + response = _invalid() self.assertEqual(response.status_code, 200) self.assertEqual(response.json()['error'], 'invalid token') # And this one will give 403 - response = invalid() + response = _invalid() self.assertEqual(response.status_code, 403) def test_test_managed(self) -> None: @@ -108,7 +109,7 @@ class ActorTestTest(rest.test.RESTActorTestCase): def test_test_unmanaged(self) -> None: # try for a first few services service = self.user_service_managed.deployed_service.service - actor_token = self.login_and_register() + _actor_token = self.login_and_register() # Get service token self.do_test(UNMANAGED, service.token or '') diff --git a/server/tests/REST/servers/test_register.py b/server/tests/REST/servers/test_register.py index c8b5bd564..7c308975a 100644 --- a/server/tests/REST/servers/test_register.py +++ b/server/tests/REST/servers/test_register.py @@ -113,7 +113,7 @@ class ServerRegisterTest(rest.test.RESTTestCase): self._data2['mac'] = random_mac() self._data2['os'] = ( types.os.KnownOS.UNKNOWN.value[0] - if os != types.os.KnownOS.UNKNOWN + if os != types.os.KnownOS.UNKNOWN.value[0] else types.os.KnownOS.WINDOWS.value[0] ) response = self.client.rest_post( @@ -135,8 +135,6 @@ class ServerRegisterTest(rest.test.RESTTestCase): # Rest of fields should be the same def test_invalid_register(self) -> None: - response: 'UDSHttpResponse' - def _do_test(where: str) -> None: response = self.client.rest_post( 'servers/register', diff --git a/server/tests/REST/test_system.py b/server/tests/REST/test_system.py index 0640726c0..4ef8ccbf1 100644 --- a/server/tests/REST/test_system.py +++ b/server/tests/REST/test_system.py @@ -44,7 +44,7 @@ logger = logging.getLogger(__name__) class SystemTest(rest.test.RESTTestCase): - def test_overview(self): + def test_overview(self) -> None: # If not logged in, will fail response = self.client.rest_get('system/overview') self.assertEqual(response.status_code, 403) @@ -70,7 +70,7 @@ class SystemTest(rest.test.RESTTestCase): self.assertEqual(json['restrained_services_pools'], 0) - def test_chart_pool(self): + def test_chart_pool(self) -> None: # First, create fixtures for the pool DAYS = 30 for pool in [self.user_service_managed, self.user_service_unmanaged]: diff --git a/server/tests/auths/simple_ldap/test_serialization.py b/server/tests/auths/simple_ldap/test_serialization.py index 06dd33b2c..572936dc6 100644 --- a/server/tests/auths/simple_ldap/test_serialization.py +++ b/server/tests/auths/simple_ldap/test_serialization.py @@ -71,7 +71,7 @@ SERIALIZED_AUTH_DATA: typing.Final[typing.Mapping[str, bytes]] = { class SimpleLdapSerializationTest(UDSTestCase): - def check_provider(self, version: str, instance: 'authenticator.SimpleLDAPAuthenticator'): + def check_provider(self, version: str, instance: 'authenticator.SimpleLDAPAuthenticator') -> None: self.assertEqual(instance.host.as_str(), 'host') self.assertEqual(instance.port.as_int(), 166) self.assertEqual(instance.use_ssl.as_bool(), True) @@ -91,14 +91,14 @@ class SimpleLdapSerializationTest(UDSTestCase): self.assertEqual(instance.verify_ssl.as_bool(), True) self.assertEqual(instance.certificate.as_str(), 'cert') - def test_unmarshall_all_versions(self): + def test_unmarshall_all_versions(self) -> None: for v in range(1, len(SERIALIZED_AUTH_DATA) + 1): with Environment.temporary_environment() as env: instance = authenticator.SimpleLDAPAuthenticator(environment=env) instance.unmarshal(SERIALIZED_AUTH_DATA['v{}'.format(v)]) self.check_provider(f'v{v}', instance) - def test_marshaling(self): + def test_marshaling(self) -> None: # Unmarshall last version, remarshall and check that is marshalled using new marshalling format LAST_VERSION = 'v{}'.format(len(SERIALIZED_AUTH_DATA)) with Environment.temporary_environment() as env: diff --git a/server/tests/core/jobs/test_scheduler.py b/server/tests/core/jobs/test_scheduler.py index 3fd7e1abd..63b252da3 100644 --- a/server/tests/core/jobs/test_scheduler.py +++ b/server/tests/core/jobs/test_scheduler.py @@ -33,6 +33,7 @@ import time import threading from tracemalloc import stop +import typing from unittest import mock from django.test import TransactionTestCase @@ -57,24 +58,24 @@ class SchedulerTest(TransactionTestCase): ) as mock_ensure_jobs_registered: left = 4 - def our_execute_job(*args, **kwargs) -> None: + def _our_execute_job(*args: typing.Any, **kwargs: typing.Any) -> None: nonlocal left left -= 1 if left == 0: sch.notify_termination() - mock_execute_job.side_effect = our_execute_job + mock_execute_job.side_effect = _our_execute_job # Execute run, but if it does not call execute_job, it will hang # so we execute a thread that will call notify_termination after 1 second stop_event = threading.Event() - def ensure_stops() -> None: + def _ensure_stops() -> None: stop_event.wait(scheduler.Scheduler.granularity * 10) if left > 0: sch.notify_termination() - watchdog = threading.Thread(target=ensure_stops) + watchdog = threading.Thread(target=_ensure_stops) watchdog.start() sch.run() diff --git a/server/tests/core/managers/test_downloads.py b/server/tests/core/managers/test_downloads.py index acfb393d5..cdcbd95e8 100644 --- a/server/tests/core/managers/test_downloads.py +++ b/server/tests/core/managers/test_downloads.py @@ -50,7 +50,7 @@ class DownloadsManagerTest(WEBTestCase): manager: DownloadsManager @classmethod - def setUpClass(cls): + def setUpClass(cls) -> None: from uds.core.managers import downloads_manager super().setUpClass() @@ -60,7 +60,7 @@ class DownloadsManagerTest(WEBTestCase): ) cls.manager = downloads_manager() - def test_downloads(self): + def test_downloads(self) -> None: for v in ( ('test.txt', 'text/plain', '1f47ec0a-1ad4-5d63-b41c-5d2befadab8d'), ( diff --git a/server/tests/core/models/test_account.py b/server/tests/core/models/test_account.py index 53f88512d..f4de37c98 100644 --- a/server/tests/core/models/test_account.py +++ b/server/tests/core/models/test_account.py @@ -132,9 +132,10 @@ class ModelAccountTest(UDSTestCase): acc = models.Account.objects.create(name='Test Account') for i in range(NUM_USERSERVICES): usage = acc.start_accounting(self.user_services[i]) - self.assertIsNotNone(usage) - usage.start = usage.start - datetime.timedelta(seconds=32 + i) # type: ignore - usage.save(update_fields=['start']) # type: ignore + if not usage: + self.fail('Usage not created') + usage.start = usage.start - datetime.timedelta(seconds=32 + i) + usage.save(update_fields=['start']) usage_end = acc.stop_accounting(self.user_services[i]) self.assertIsNotNone(usage_end) diff --git a/server/tests/core/models/test_model_uuid.py b/server/tests/core/models/test_model_uuid.py index 7a92fd06b..42867c25f 100644 --- a/server/tests/core/models/test_model_uuid.py +++ b/server/tests/core/models/test_model_uuid.py @@ -51,7 +51,7 @@ class ModelUUIDTest(UDSTestCase): self.group = authenticators_fixtures.create_groups(self.auth, 1)[0] self.user = authenticators_fixtures.create_users(self.auth, 1, groups=[self.group])[0] - def test_uuid_lowercase(self): + def test_uuid_lowercase(self) -> None: """ Tests that uuids are always lowercase """ diff --git a/server/tests/core/reports/test_graphs.py b/server/tests/core/reports/test_graphs.py index 6d8a23f77..daf6eb895 100644 --- a/server/tests/core/reports/test_graphs.py +++ b/server/tests/core/reports/test_graphs.py @@ -47,7 +47,7 @@ class GraphsTest(UDSTestCase): data1: dict[str, typing.Any] data2: dict[str, typing.Any] - def setUp(self): + def setUp(self) -> None: # Data must be a dict with the following keys: # - x: List of x values # - y: List of dicts with the following keys: @@ -109,7 +109,7 @@ class GraphsTest(UDSTestCase): } - def test_bar_chart(self): + def test_bar_chart(self) -> None: output = io.BytesIO() graphs.bar_chart((10, 8, 96), data=self.data1, output=output) value = output.getvalue() @@ -120,7 +120,7 @@ class GraphsTest(UDSTestCase): f.write(value) - def test_line_chart(self): + def test_line_chart(self) -> None: output = io.BytesIO() graphs.line_chart((10, 8, 96), data=self.data1, output=output) value = output.getvalue() @@ -131,7 +131,7 @@ class GraphsTest(UDSTestCase): f.write(value) - def test_surface_chart(self): + def test_surface_chart(self) -> None: output = io.BytesIO() graphs.surface_chart((10, 8, 96), data=self.data2, output=output) value = output.getvalue() @@ -143,7 +143,7 @@ class GraphsTest(UDSTestCase): f.write(value) - def test_surface_chart_wireframe(self): + def test_surface_chart_wireframe(self) -> None: self.data2['wireframe'] = True output = io.BytesIO() graphs.surface_chart((10, 8, 96), data=self.data2, output=output) diff --git a/server/tests/core/ui/test_gui.py b/server/tests/core/ui/test_gui.py index d76cb013d..884d37d7e 100644 --- a/server/tests/core/ui/test_gui.py +++ b/server/tests/core/ui/test_gui.py @@ -39,7 +39,7 @@ from django.conf import settings from uds.core.util import ensure class GuiTest(UDSTestCase): - def test_globals(self): + def test_globals(self) -> None: self.assertEqual(UDSK, settings.SECRET_KEY[8:24].encode()) self.assertEqual(UDSB, b'udsprotect') diff --git a/server/tests/core/ui/test_userinterface_data.py b/server/tests/core/ui/test_userinterface_data.py index d9c6151e5..747b55110 100644 --- a/server/tests/core/ui/test_userinterface_data.py +++ b/server/tests/core/ui/test_userinterface_data.py @@ -48,7 +48,7 @@ logger = logging.getLogger(__name__) class UserinterfaceInternalTest(UDSTestCase): - def test_value(self): + def test_value(self) -> None: # Asserts that data is correctly stored and retrieved ui = TestingUserInterface() self.assertEqual(ui.str_field.value, DEFAULTS['str_field']) @@ -88,7 +88,7 @@ class UserinterfaceInternalTest(UDSTestCase): self.assertEqual(ui.help_field.value, DEFAULTS['help_field']) - def test_default(self): + def test_default(self) -> None: ui = TestingUserInterface() # Now for default values self.assertEqual(ui.str_field.default, DEFAULTS['str_field']) @@ -104,7 +104,7 @@ class UserinterfaceInternalTest(UDSTestCase): self.assertEqual(ui.date_field.default, DEFAULTS['date_field']) self.assertEqual(ui.help_field.default, DEFAULTS['help_field']) - def test_references(self): + def test_references(self) -> None: ui = TestingUserInterface() # Ensure references are fine self.assertEqual(ui.str_field.value, ui._gui['str_field'].value) @@ -120,7 +120,7 @@ class UserinterfaceInternalTest(UDSTestCase): self.assertEqual(ui.date_field.value, ui._gui['date_field'].value) self.assertEqual(ui.help_field.value, ui._gui['help_field'].value) - def test_modify(self): + def test_modify(self) -> None: ui = TestingUserInterface() # Modify values, and recheck references ui.str_field.value = 'New value' @@ -148,7 +148,7 @@ class UserinterfaceInternalTest(UDSTestCase): ui.help_field.value = 'New value' self.assertEqual(ui.help_field.value, ui._gui['help_field'].value) - def test_valuesDict(self): + def test_valuesDict(self) -> None: ui = TestingUserInterface() self.assertEqual( ui.get_fields_as_dict(), @@ -168,7 +168,7 @@ class UserinterfaceInternalTest(UDSTestCase): }, ) - def test_labels(self): + def test_labels(self) -> None: ui = TestingUserInterface() self.assertEqual( { k: v.label for k, v in ui._gui.items() }, @@ -188,7 +188,7 @@ class UserinterfaceInternalTest(UDSTestCase): }, ) - def test_order(self): + def test_order(self) -> None: ui = TestingUserInterface() self.assertEqual( { k: v._fields_info.order for k, v in ui._gui.items() }, @@ -208,7 +208,7 @@ class UserinterfaceInternalTest(UDSTestCase): }, ) - def test_required(self): + def test_required(self) -> None: ui = TestingUserInterface() self.assertEqual( { k: v._fields_info.required for k, v in ui._gui.items() }, @@ -228,7 +228,7 @@ class UserinterfaceInternalTest(UDSTestCase): }, ) - def test_tooltip(self): + def test_tooltip(self) -> None: ui = TestingUserInterface() self.assertEqual( { k: v._fields_info.tooltip for k, v in ui._gui.items() }, diff --git a/server/tests/core/ui/test_userinterface_serialization.py b/server/tests/core/ui/test_userinterface_serialization.py index 67d3e54d7..bcabf1f54 100644 --- a/server/tests/core/ui/test_userinterface_serialization.py +++ b/server/tests/core/ui/test_userinterface_serialization.py @@ -39,7 +39,7 @@ import collections.abc from ...utils.test import UDSTestCase from uds.core import types, consts -from uds.core.ui.user_interface import gui +from uds.core.ui.user_interface import UserInterface from unittest import mock from ...fixtures.user_interface import ( @@ -52,7 +52,7 @@ from ...fixtures.user_interface import ( logger = logging.getLogger(__name__) -def old_serialize_form(ui) -> bytes: +def old_serialize_form(ui: 'UserInterface') -> bytes: """ All values stored at form fields are serialized and returned as a single string @@ -76,7 +76,7 @@ def old_serialize_form(ui) -> bytes: # Separators for fields, old implementation MULTIVALUE_FIELD: typing.Final[bytes] = b'\001' - OLD_PASSWORD_FIELD: typing.Final[bytes] = b'\004' + # OLD_PASSWORD_FIELD: typing.Final[bytes] = b'\004' PASSWORD_FIELD: typing.Final[bytes] = b'\005' FIELD_SEPARATOR: typing.Final[bytes] = b'\002' @@ -85,7 +85,7 @@ def old_serialize_form(ui) -> bytes: # import inspect # logger.debug('Caller is : {}'.format(inspect.stack())) - arr = [] + arr: list[bytes] = [] val: typing.Any for k, v in ui._gui.items(): logger.debug('serializing Key: %s/%s', k, v.value) @@ -147,7 +147,7 @@ class UserinterfaceTest(UDSTestCase): ) self.assertEqual(ui.date_field.value, DEFAULTS['date_field'], 'date_field') - def test_old_serialization(self): + def test_old_serialization(self) -> None: # This test is to ensure that old serialized data can be loaded # This data is from a ui = TestingUserInterface() @@ -165,7 +165,7 @@ class UserinterfaceTest(UDSTestCase): self.assertEqual(ui, ui3) self.ensure_values_fine(ui3) - def test_new_serialization(self): + def test_new_serialization(self) -> None: # This test is to ensure that new serialized data can be loaded # First ui = TestingUserInterface() @@ -176,7 +176,7 @@ class UserinterfaceTest(UDSTestCase): self.assertEqual(ui, ui2) self.ensure_values_fine(ui2) - def test_old_field_name(self): + def test_old_field_name(self) -> None: # This test is to ensure that new serialized data can be loaded # mock logging warning ui = TestingUserInterfaceFieldNameOrig() diff --git a/server/tests/core/util/test_auto_serializable.py b/server/tests/core/util/test_auto_serializable.py index 490769c1b..59994aac0 100644 --- a/server/tests/core/util/test_auto_serializable.py +++ b/server/tests/core/util/test_auto_serializable.py @@ -185,7 +185,7 @@ class AutoSerializable(UDSTestCase): def test_auto_serializable_base_encrypted(self) -> None: self.basic_check(AutoSerializableClass, AutoSerializableEncryptedClass) - def test_auto_serializable_derived(self): + def test_auto_serializable_derived(self) -> None: instance = DerivedAutoSerializableClass() instance.int_field = 1 instance.str_field = UNICODE_CHARS @@ -204,7 +204,7 @@ class AutoSerializable(UDSTestCase): self.assertEqual(instance, instance2) - def test_auto_serializable_derived_added(self): + def test_auto_serializable_derived_added(self) -> None: instance = DerivedAutoSerializableClass2() instance.int_field = 1 instance.str_field = UNICODE_CHARS @@ -225,7 +225,7 @@ class AutoSerializable(UDSTestCase): self.assertEqual(instance, instance2) - def test_auto_serializable_with_missing_fields(self): + def test_auto_serializable_with_missing_fields(self) -> None: instance = AutoSerializableClass() instance.int_field = 1 instance.str_field = UNICODE_CHARS @@ -250,7 +250,7 @@ class AutoSerializable(UDSTestCase): self.assertEqual(instance2.list_field, [1, 2, 3]) self.assertEqual(instance2.obj_nt_field, SerializableNamedTuple(2, '3', 4.0)) - def test_auto_serializable_with_added_fields(self): + def test_auto_serializable_with_added_fields(self) -> None: instance = AutoSerializableClassWithMissingFields() instance.int_field = 1 instance.bool_field = True diff --git a/server/tests/core/util/test_cache.py b/server/tests/core/util/test_cache.py index 9654379a9..f555b117b 100644 --- a/server/tests/core/util/test_cache.py +++ b/server/tests/core/util/test_cache.py @@ -44,7 +44,7 @@ VALUE_1 = [u'únîcödè€', b'string', {'a': 1, 'b': 2.0}] class CacheTest(UDSTransactionTestCase): - def test_cache(self): + def test_cache(self) -> None: cache = Cache(UNICODE_CHARS) # Get default value, with unicode diff --git a/server/tests/core/util/test_calendar.py b/server/tests/core/util/test_calendar.py index 3bc3537d0..98ffdd115 100644 --- a/server/tests/core/util/test_calendar.py +++ b/server/tests/core/util/test_calendar.py @@ -42,7 +42,7 @@ class CalendarTest(UDSTestCase): def setUp(self) -> None: createCalendars() - def test_calendar_dayly(self): + def test_calendar_dayly(self) -> None: cal = Calendar.objects.get(uuid='2cf6846b-d889-57ce-bb35-e647040a95b6') chk = calendar.CalendarChecker(cal) calendar.CalendarChecker.updates = 0 @@ -94,7 +94,7 @@ class CalendarTest(UDSTestCase): self.assertEqual(chk.updates, 366) - def test_calendar_weekly(self): + def test_calendar_weekly(self) -> None: cal = Calendar.objects.get(uuid='c1221a6d-3848-5fa3-ae98-172662c0f554') chk = calendar.CalendarChecker(cal) calendar.CalendarChecker.updates = 0 @@ -144,7 +144,7 @@ class CalendarTest(UDSTestCase): self.assertEqual(chk.updates, 365) - def test_calendar_monthly(self): + def test_calendar_monthly(self) -> None: cal = Calendar.objects.get(uuid='353c4cb8-e02d-5387-a18f-f634729fde81') chk = calendar.CalendarChecker(cal) calendar.CalendarChecker.updates = 0 @@ -186,7 +186,7 @@ class CalendarTest(UDSTestCase): self.assertEqual(chk.updates, 730) - def test_calendar_weekdays(self): + def test_calendar_weekdays(self) -> None: cal = Calendar.objects.get(uuid='bccfd011-605b-565f-a08e-80bf75114dce') chk = calendar.CalendarChecker(cal) calendar.CalendarChecker.updates = 0 @@ -229,7 +229,7 @@ class CalendarTest(UDSTestCase): self.assertEqual(chk.updates, 730) - def test_calendar_durations(self): + def test_calendar_durations(self) -> None: cal = Calendar.objects.get(uuid='60160f94-c8fe-5fdc-bbbe-325010980106') chk = calendar.CalendarChecker(cal) diff --git a/server/tests/core/util/test_permissions.py b/server/tests/core/util/test_permissions.py index 40760b7a7..79422fa24 100644 --- a/server/tests/core/util/test_permissions.py +++ b/server/tests/core/util/test_permissions.py @@ -35,6 +35,8 @@ import typing import collections.abc import uds.core.types.permissions +from django.db.models import Model + from uds.core.util import permissions from uds.core.util import objtype from uds import models @@ -62,9 +64,7 @@ class PermissionsTest(UDSTestCase): def setUp(self) -> None: self.authenticator = authenticators_fixtures.create_authenticator() self.groups = authenticators_fixtures.create_groups(self.authenticator) - self.users = authenticators_fixtures.create_users( - self.authenticator, groups=self.groups - ) + self.users = authenticators_fixtures.create_users(self.authenticator, groups=self.groups) self.admins = authenticators_fixtures.create_users( self.authenticator, is_admin=True, groups=self.groups ) @@ -83,98 +83,54 @@ class PermissionsTest(UDSTestCase): self.network = network_fixtures.createNetwork() - def doTestUserPermissions(self, obj, user: models.User): - permissions.add_user_permission( - user, obj, uds.core.types.permissions.PermissionType.NONE - ) + def doTestUserPermissions(self, obj: 'Model', user: models.User) -> None: + permissions.add_user_permission(user, obj, uds.core.types.permissions.PermissionType.NONE) self.assertEqual(models.Permissions.objects.count(), 1) perm = models.Permissions.objects.all()[0] self.assertEqual(perm.object_type, objtype.ObjectType.from_model(obj).type) self.assertEqual(perm.object_id, obj.pk) self.assertEqual(perm.permission, uds.core.types.permissions.PermissionType.NONE) - self.assertTrue( - permissions.has_access( - user, obj, uds.core.types.permissions.PermissionType.NONE - ) - ) + self.assertTrue(permissions.has_access(user, obj, uds.core.types.permissions.PermissionType.NONE)) self.assertEqual( - permissions.has_access( - user, obj, uds.core.types.permissions.PermissionType.READ - ), + permissions.has_access(user, obj, uds.core.types.permissions.PermissionType.READ), user.is_admin, ) self.assertEqual( - permissions.has_access( - user, obj, uds.core.types.permissions.PermissionType.MANAGEMENT - ), + permissions.has_access(user, obj, uds.core.types.permissions.PermissionType.MANAGEMENT), user.is_admin, ) self.assertEqual( - permissions.has_access( - user, obj, uds.core.types.permissions.PermissionType.ALL - ), + permissions.has_access(user, obj, uds.core.types.permissions.PermissionType.ALL), user.is_admin, ) # Add a new permission, must overwrite the previous one - permissions.add_user_permission( - user, obj, uds.core.types.permissions.PermissionType.ALL - ) + permissions.add_user_permission(user, obj, uds.core.types.permissions.PermissionType.ALL) self.assertEqual(models.Permissions.objects.count(), 1) perm = models.Permissions.objects.all()[0] self.assertEqual(perm.object_type, PermissionsTest.getObjectType(obj)) self.assertEqual(perm.object_id, obj.pk) self.assertEqual(perm.permission, uds.core.types.permissions.PermissionType.ALL) - self.assertTrue( - permissions.has_access( - user, obj, uds.core.types.permissions.PermissionType.NONE - ) - ) - self.assertTrue( - permissions.has_access( - user, obj, uds.core.types.permissions.PermissionType.READ - ) - ) - self.assertTrue( - permissions.has_access( - user, obj, uds.core.types.permissions.PermissionType.MANAGEMENT - ) - ) - self.assertTrue( - permissions.has_access( - user, obj, uds.core.types.permissions.PermissionType.ALL - ) - ) + self.assertTrue(permissions.has_access(user, obj, uds.core.types.permissions.PermissionType.NONE)) + self.assertTrue(permissions.has_access(user, obj, uds.core.types.permissions.PermissionType.READ)) + self.assertTrue(permissions.has_access(user, obj, uds.core.types.permissions.PermissionType.MANAGEMENT)) + self.assertTrue(permissions.has_access(user, obj, uds.core.types.permissions.PermissionType.ALL)) # Again, with read - permissions.add_user_permission( - user, obj, uds.core.types.permissions.PermissionType.READ - ) + permissions.add_user_permission(user, obj, uds.core.types.permissions.PermissionType.READ) self.assertEqual(models.Permissions.objects.count(), 1) perm = models.Permissions.objects.all()[0] self.assertEqual(perm.object_type, PermissionsTest.getObjectType(obj)) self.assertEqual(perm.object_id, obj.pk) self.assertEqual(perm.permission, uds.core.types.permissions.PermissionType.READ) - self.assertTrue( - permissions.has_access( - user, obj, uds.core.types.permissions.PermissionType.NONE - ) - ) - self.assertTrue( - permissions.has_access( - user, obj, uds.core.types.permissions.PermissionType.READ - ) - ) + self.assertTrue(permissions.has_access(user, obj, uds.core.types.permissions.PermissionType.NONE)) + self.assertTrue(permissions.has_access(user, obj, uds.core.types.permissions.PermissionType.READ)) self.assertEqual( - permissions.has_access( - user, obj, uds.core.types.permissions.PermissionType.MANAGEMENT - ), + permissions.has_access(user, obj, uds.core.types.permissions.PermissionType.MANAGEMENT), user.is_admin, ) self.assertEqual( - permissions.has_access( - user, obj, uds.core.types.permissions.PermissionType.ALL - ), + permissions.has_access(user, obj, uds.core.types.permissions.PermissionType.ALL), user.is_admin, ) @@ -182,85 +138,47 @@ class PermissionsTest(UDSTestCase): obj.delete() self.assertEqual(models.Permissions.objects.count(), 0) - def doTestGroupPermissions(self, obj, user: models.User): + def doTestGroupPermissions(self, obj: 'Model', user: models.User): group = user.groups.all()[0] - permissions.add_group_permission( - group, obj, uds.core.types.permissions.PermissionType.NONE - ) + permissions.add_group_permission(group, obj, uds.core.types.permissions.PermissionType.NONE) self.assertEqual(models.Permissions.objects.count(), 1) perm = models.Permissions.objects.all()[0] self.assertEqual(perm.object_type, PermissionsTest.getObjectType(obj)) self.assertEqual(perm.object_id, obj.pk) self.assertEqual(perm.permission, uds.core.types.permissions.PermissionType.NONE) - self.assertTrue( - permissions.has_access( - user, obj, uds.core.types.permissions.PermissionType.NONE - ) - ) + self.assertTrue(permissions.has_access(user, obj, uds.core.types.permissions.PermissionType.NONE)) # Admins has all permissions ALWAYS self.assertEqual( - permissions.has_access( - user, obj, uds.core.types.permissions.PermissionType.READ - ), + permissions.has_access(user, obj, uds.core.types.permissions.PermissionType.READ), user.is_admin, ) self.assertEqual( - permissions.has_access( - user, obj, uds.core.types.permissions.PermissionType.ALL - ), + permissions.has_access(user, obj, uds.core.types.permissions.PermissionType.ALL), user.is_admin, ) - permissions.add_group_permission( - group, obj, uds.core.types.permissions.PermissionType.ALL - ) + permissions.add_group_permission(group, obj, uds.core.types.permissions.PermissionType.ALL) self.assertEqual(models.Permissions.objects.count(), 1) perm = models.Permissions.objects.all()[0] self.assertEqual(perm.object_type, PermissionsTest.getObjectType(obj)) self.assertEqual(perm.object_id, obj.pk) self.assertEqual(perm.permission, uds.core.types.permissions.PermissionType.ALL) - self.assertTrue( - permissions.has_access( - user, obj, uds.core.types.permissions.PermissionType.NONE - ) - ) - self.assertTrue( - permissions.has_access( - user, obj, uds.core.types.permissions.PermissionType.READ - ) - ) - self.assertTrue( - permissions.has_access( - user, obj, uds.core.types.permissions.PermissionType.ALL - ) - ) + self.assertTrue(permissions.has_access(user, obj, uds.core.types.permissions.PermissionType.NONE)) + self.assertTrue(permissions.has_access(user, obj, uds.core.types.permissions.PermissionType.READ)) + self.assertTrue(permissions.has_access(user, obj, uds.core.types.permissions.PermissionType.ALL)) # Add user permission, DB must contain both an return ALL - permissions.add_user_permission( - user, obj, uds.core.types.permissions.PermissionType.READ - ) + permissions.add_user_permission(user, obj, uds.core.types.permissions.PermissionType.READ) self.assertEqual(models.Permissions.objects.count(), 2) perm = models.Permissions.objects.all()[0] self.assertEqual(perm.object_type, PermissionsTest.getObjectType(obj)) self.assertEqual(perm.object_id, obj.pk) self.assertEqual(perm.permission, uds.core.types.permissions.PermissionType.ALL) - self.assertTrue( - permissions.has_access( - user, obj, uds.core.types.permissions.PermissionType.NONE - ) - ) - self.assertTrue( - permissions.has_access( - user, obj, uds.core.types.permissions.PermissionType.READ - ) - ) - self.assertTrue( - permissions.has_access( - user, obj, uds.core.types.permissions.PermissionType.ALL - ) - ) + self.assertTrue(permissions.has_access(user, obj, uds.core.types.permissions.PermissionType.NONE)) + self.assertTrue(permissions.has_access(user, obj, uds.core.types.permissions.PermissionType.READ)) + self.assertTrue(permissions.has_access(user, obj, uds.core.types.permissions.PermissionType.ALL)) # Remove obj, permissions must have gone away obj.delete() @@ -268,150 +186,126 @@ class PermissionsTest(UDSTestCase): # Every tests reverses the DB and recalls setUp - def test_user_auth_permissions_user(self): + def test_user_auth_permissions_user(self) -> None: self.doTestUserPermissions(self.authenticator, self.users[0]) - def test_user_auth_permissions_admin(self): + def test_user_auth_permissions_admin(self) -> None: self.doTestUserPermissions(self.authenticator, self.admins[0]) - def test_user_auth_permissions_staff(self): + def test_user_auth_permissions_staff(self) -> None: self.doTestUserPermissions(self.authenticator, self.staffs[0]) - def test_group_auth_permissions_user(self): + def test_group_auth_permissions_user(self) -> None: self.doTestGroupPermissions(self.authenticator, self.users[0]) - def test_group_auth_permissions_admin(self): + def test_group_auth_permissions_admin(self) -> None: self.doTestGroupPermissions(self.authenticator, self.admins[0]) - def test_group_auth_permissions_staff(self): + def test_group_auth_permissions_staff(self) -> None: self.doTestGroupPermissions(self.authenticator, self.staffs[0]) - def test_user_servicepool_permissions_user(self): + def test_user_servicepool_permissions_user(self) -> None: self.doTestUserPermissions(self.userService.deployed_service, self.users[0]) - def test_user_servicepool_permissions_admin(self): + def test_user_servicepool_permissions_admin(self) -> None: self.doTestUserPermissions(self.userService.deployed_service, self.admins[0]) - def test_user_servicepool_permissions_staff(self): + def test_user_servicepool_permissions_staff(self) -> None: self.doTestUserPermissions(self.userService.deployed_service, self.staffs[0]) - def test_group_servicepool_permissions_user(self): + def test_group_servicepool_permissions_user(self) -> None: self.doTestGroupPermissions(self.userService.deployed_service, self.users[0]) - def test_group_servicepool_permissions_admin(self): + def test_group_servicepool_permissions_admin(self) -> None: self.doTestGroupPermissions(self.userService.deployed_service, self.admins[0]) - def test_group_servicepool_permissions_staff(self): + def test_group_servicepool_permissions_staff(self) -> None: self.doTestGroupPermissions(self.userService.deployed_service, self.staffs[0]) - def test_user_transport_permissions_user(self): + def test_user_transport_permissions_user(self) -> None: self.doTestUserPermissions( - self.userService.deployed_service.transports.first(), self.users[0] + typing.cast(models.Transport, self.userService.deployed_service.transports.first()), self.users[0] ) - def test_user_transport_permissions_admin(self): + def test_user_transport_permissions_admin(self) -> None: self.doTestUserPermissions( - self.userService.deployed_service.transports.first(), self.admins[0] + typing.cast(models.Transport, self.userService.deployed_service.transports.first()), self.admins[0] ) - def test_user_transport_permissions_staff(self): + def test_user_transport_permissions_staff(self) -> None: self.doTestUserPermissions( - self.userService.deployed_service.transports.first(), self.staffs[0] + typing.cast(models.Transport, self.userService.deployed_service.transports.first()), self.staffs[0] ) - def test_group_transport_permissions_user(self): + def test_group_transport_permissions_user(self) -> None: self.doTestGroupPermissions( - self.userService.deployed_service.transports.first(), self.users[0] + typing.cast(models.Transport, self.userService.deployed_service.transports.first()), self.users[0] ) - def test_group_transport_permissions_admin(self): + def test_group_transport_permissions_admin(self) -> None: self.doTestGroupPermissions( - self.userService.deployed_service.transports.first(), self.admins[0] + typing.cast(models.Transport, self.userService.deployed_service.transports.first()), self.admins[0] ) - def test_group_transport_permissions_staff(self): + def test_group_transport_permissions_staff(self) -> None: self.doTestGroupPermissions( - self.userService.deployed_service.transports.first(), self.staffs[0] + typing.cast(models.Transport, self.userService.deployed_service.transports.first()), self.staffs[0] ) - def test_user_service_permissions_user(self): - self.doTestUserPermissions( - self.userService.deployed_service.service, self.users[0] - ) + def test_user_service_permissions_user(self) -> None: + self.doTestUserPermissions(self.userService.deployed_service.service, self.users[0]) - def test_user_service_permissions_admin(self): - self.doTestUserPermissions( - self.userService.deployed_service.service, self.admins[0] - ) + def test_user_service_permissions_admin(self) -> None: + self.doTestUserPermissions(self.userService.deployed_service.service, self.admins[0]) - def test_user_service_permissions_staff(self): - self.doTestUserPermissions( - self.userService.deployed_service.service, self.staffs[0] - ) + def test_user_service_permissions_staff(self) -> None: + self.doTestUserPermissions(self.userService.deployed_service.service, self.staffs[0]) - def test_group_service_permissions_user(self): - self.doTestGroupPermissions( - self.userService.deployed_service.service, self.users[0] - ) + def test_group_service_permissions_user(self) -> None: + self.doTestGroupPermissions(self.userService.deployed_service.service, self.users[0]) - def test_group_service_permissions_admin(self): - self.doTestGroupPermissions( - self.userService.deployed_service.service, self.admins[0] - ) + def test_group_service_permissions_admin(self) -> None: + self.doTestGroupPermissions(self.userService.deployed_service.service, self.admins[0]) - def test_group_service_permissions_staff(self): - self.doTestGroupPermissions( - self.userService.deployed_service.service, self.staffs[0] - ) + def test_group_service_permissions_staff(self) -> None: + self.doTestGroupPermissions(self.userService.deployed_service.service, self.staffs[0]) - def test_user_provider_permissions_user(self): - self.doTestUserPermissions( - self.userService.deployed_service.service.provider, self.users[0] - ) + def test_user_provider_permissions_user(self) -> None: + self.doTestUserPermissions(self.userService.deployed_service.service.provider, self.users[0]) - def test_user_provider_permissions_admin(self): - self.doTestUserPermissions( - self.userService.deployed_service.service.provider, self.admins[0] - ) + def test_user_provider_permissions_admin(self) -> None: + self.doTestUserPermissions(self.userService.deployed_service.service.provider, self.admins[0]) - def test_user_provider_permissions_staff(self): - self.doTestUserPermissions( - self.userService.deployed_service.service.provider, self.staffs[0] - ) + def test_user_provider_permissions_staff(self) -> None: + self.doTestUserPermissions(self.userService.deployed_service.service.provider, self.staffs[0]) - def test_group_provider_permissions_user(self): - self.doTestGroupPermissions( - self.userService.deployed_service.service.provider, self.users[0] - ) + def test_group_provider_permissions_user(self) -> None: + self.doTestGroupPermissions(self.userService.deployed_service.service.provider, self.users[0]) - def test_group_provider_permissions_admin(self): - self.doTestGroupPermissions( - self.userService.deployed_service.service.provider, self.admins[0] - ) + def test_group_provider_permissions_admin(self) -> None: + self.doTestGroupPermissions(self.userService.deployed_service.service.provider, self.admins[0]) - def test_group_provider_permissions_staff(self): - self.doTestGroupPermissions( - self.userService.deployed_service.service.provider, self.staffs[0] - ) + def test_group_provider_permissions_staff(self) -> None: + self.doTestGroupPermissions(self.userService.deployed_service.service.provider, self.staffs[0]) - def test_user_network_permissions_user(self): + def test_user_network_permissions_user(self) -> None: self.doTestUserPermissions(self.network, self.users[0]) - def test_user_network_permissions_admin(self): + def test_user_network_permissions_admin(self) -> None: self.doTestUserPermissions(self.network, self.admins[0]) - def test_user_network_permissions_staff(self): + def test_user_network_permissions_staff(self) -> None: self.doTestUserPermissions(self.network, self.staffs[0]) - def test_group_network_permissions_user(self): + def test_group_network_permissions_user(self) -> None: self.doTestGroupPermissions(self.network, self.users[0]) - def test_group_network_permissions_admin(self): + def test_group_network_permissions_admin(self) -> None: self.doTestGroupPermissions(self.network, self.admins[0]) - def test_group_network_permissions_staff(self): + def test_group_network_permissions_staff(self) -> None: self.doTestGroupPermissions(self.network, self.staffs[0]) @staticmethod - def getObjectType(obj: typing.Type) -> int: + def getObjectType(obj: typing.Any) -> int: return objtype.ObjectType.from_model(obj).type diff --git a/server/tests/core/util/test_storage.py b/server/tests/core/util/test_storage.py index e4724794a..306ce4047 100644 --- a/server/tests/core/util/test_storage.py +++ b/server/tests/core/util/test_storage.py @@ -41,7 +41,7 @@ VALUE_1 = ['unicode', b'string', {'a': 1, 'b': 2.0}] class StorageTest(UDSTestCase): - def test_storage(self): + def test_storage(self) -> None: strg = storage.Storage(UNICODE_CHARS) strg.put(UNICODE_CHARS, b'chars') @@ -72,7 +72,7 @@ class StorageTest(UDSTestCase): self.assertIsNone(strg.get(b'key')) self.assertIsNone(strg.get_unpickle('pickle')) - def test_storage_as_dict(self): + def test_storage_as_dict(self) -> None: strg = storage.Storage(UNICODE_CHARS) strg.put(UNICODE_CHARS, 'chars') @@ -89,7 +89,7 @@ class StorageTest(UDSTestCase): # because the format is not compatible (with the dict, the values are stored as a tuple, with the original key stored # and with old format, only the value is stored - def test_old_storage_compat(self): + def test_old_storage_compat(self) -> None: models.Storage.objects.create( owner=UNICODE_CHARS, key=storage._old_calculate_key(UNICODE_CHARS.encode(), UNICODE_CHARS.encode()), @@ -105,7 +105,7 @@ class StorageTest(UDSTestCase): key=storage._calculate_key(UNICODE_CHARS.encode(), UNICODE_CHARS.encode()), ) - def test_storage_as_dict_old(self): + def test_storage_as_dict_old(self) -> None: models.Storage.objects.create( owner=UNICODE_CHARS, key=storage._old_calculate_key(UNICODE_CHARS.encode(), UNICODE_CHARS.encode()), diff --git a/server/tests/core/util/test_uniqueid.py b/server/tests/core/util/test_uniqueid.py index a3eec923f..6bcd4b9de 100644 --- a/server/tests/core/util/test_uniqueid.py +++ b/server/tests/core/util/test_uniqueid.py @@ -49,81 +49,81 @@ TEST_MAC_RANGE = '00:50:56:10:00:00-00:50:56:3F:FF:FF' # Testing mac range TEST_MAC_RANGE_FULL = '00:50:56:10:00:00-00:50:56:10:00:10' # Testing mac range for NO MORE MACS -def macToInt(mac): +def mac_to_integer(mac: str) -> int: return int(mac.replace(':', ''), 16) class UniqueIdTest(UDSTestCase): - uidGen: UniqueIDGenerator + uniqueid_generator: UniqueIDGenerator ugidGen: UniqueGIDGenerator macGen: UniqueMacGenerator - nameGen: UniqueNameGenerator + name_generator: UniqueNameGenerator def setUp(self) -> None: - self.uidGen = UniqueIDGenerator('uidg1', 'test', 'test') + self.uniqueid_generator = UniqueIDGenerator('uidg1', 'test', 'test') self.ugidGen = UniqueGIDGenerator('test') self.macGen = UniqueMacGenerator('test') - self.nameGen = UniqueNameGenerator('test') + self.name_generator = UniqueNameGenerator('test') - def test_seq_uid(self): + def test_seq_uid(self) -> None: for x in range(100): - self.assertEqual(self.uidGen.get(), x) + self.assertEqual(self.uniqueid_generator.get(), x) - self.uidGen.free(40) + self.uniqueid_generator.free(40) - self.assertEqual(self.uidGen.get(), 40) + self.assertEqual(self.uniqueid_generator.get(), 40) - def test_release_unique_id(self): + def test_release_unique_id(self) -> None: for _ in range(100): - self.uidGen.get() + self.uniqueid_generator.get() - self.assertEqual(self.uidGen.get(), 100) + self.assertEqual(self.uniqueid_generator.get(), 100) - self.uidGen.release() # Clear ups + self.uniqueid_generator.release() # Clear ups - self.assertEqual(self.uidGen.get(), 0) + self.assertEqual(self.uniqueid_generator.get(), 0) - def test_release_older_unique_id(self): + def test_release_older_unique_id(self) -> None: NUM = 100 for i in range(NUM): - self.assertEqual(self.uidGen.get(), i) + self.assertEqual(self.uniqueid_generator.get(), i) stamp = sql_stamp_seconds() + 1 time.sleep(2) for i in range(NUM): - self.assertEqual(self.uidGen.get(), i + NUM) + self.assertEqual(self.uniqueid_generator.get(), i + NUM) - self.uidGen.release_older_than(stamp) # Clear ups older than 0 seconds ago + self.uniqueid_generator.release_older_than(stamp) # Clear ups older than 0 seconds ago for i in range(NUM): - self.assertEqual(self.uidGen.get(), i) + self.assertEqual(self.uniqueid_generator.get(), i) # from NUM to NUM*2-1 (both included) are still there, so we should get 200 - self.assertEqual(self.uidGen.get(), NUM * 2) - self.assertEqual(self.uidGen.get(), NUM * 2 + 1) + self.assertEqual(self.uniqueid_generator.get(), NUM * 2) + self.assertEqual(self.uniqueid_generator.get(), NUM * 2 + 1) - def test_gid(self): + def test_gid(self) -> None: for x in range(100): self.assertEqual(self.ugidGen.get(), f'uds{x:08d}') - def test_gid_basename(self): + def test_gid_basename(self) -> None: self.ugidGen.set_basename('mar') for x in range(100): self.assertEqual(self.ugidGen.get(), f'mar{x:08d}') - def test_mac(self): - start, end = TEST_MAC_RANGE.split('-') # pylint: disable=unused-variable + def test_mac(self) -> None: + start, _end = TEST_MAC_RANGE.split('-') # pylint: disable=unused-variable self.assertEqual(self.macGen.get(TEST_MAC_RANGE), start) - starti = macToInt(start) + 1 # We have already got 1 element + starti = mac_to_integer(start) + 1 # We have already got 1 element lst = [start] for x in range(400): mac = self.macGen.get(TEST_MAC_RANGE) - self.assertEqual(macToInt(mac), starti + x) + self.assertEqual(mac_to_integer(mac), starti + x) lst.append(mac) for x in lst: @@ -131,37 +131,37 @@ class UniqueIdTest(UDSTestCase): self.assertEqual(self.macGen.get(TEST_MAC_RANGE), start) - def test_mac_full(self): + def test_mac_full(self) -> None: start, end = TEST_MAC_RANGE_FULL.split('-') - length = macToInt(end) - macToInt(start) + 1 + length = mac_to_integer(end) - mac_to_integer(start) + 1 - starti = macToInt(start) + starti = mac_to_integer(start) for x in range(length): - self.assertEqual(macToInt(self.macGen.get(TEST_MAC_RANGE_FULL)), starti + x) + self.assertEqual(mac_to_integer(self.macGen.get(TEST_MAC_RANGE_FULL)), starti + x) for x in range(20): self.assertEqual(self.macGen.get(TEST_MAC_RANGE_FULL), '00:00:00:00:00:00') - def test_name(self): - lst = [] + def test_name(self) -> None: + lst: list[str] = [] num = 0 for length in range(2, 10): for x in range(20): - name = self.nameGen.get('test', length=length) + name = self.name_generator.get('test', length=length) lst.append(name) self.assertEqual(name, f'test{num:0{length}d}'.format(num, width=length)) num += 1 for x in lst: - self.nameGen.free('test', x) + self.name_generator.free('test', x) - self.assertEqual(self.nameGen.get('test', length=1), 'test0') + self.assertEqual(self.name_generator.get('test', length=1), 'test0') - def test_name_full(self): + def test_name_full(self) -> None: for _ in range(10): - self.nameGen.get('test', length=1) + self.name_generator.get('test', length=1) with self.assertRaises(KeyError): - self.nameGen.get('test', length=1) + self.name_generator.get('test', length=1) diff --git a/server/tests/core/workers/test_assigned_unused.py b/server/tests/core/workers/test_assigned_unused.py index c4759f1a7..37be74896 100644 --- a/server/tests/core/workers/test_assigned_unused.py +++ b/server/tests/core/workers/test_assigned_unused.py @@ -47,13 +47,13 @@ from ...fixtures import services as fixtures_services class AssignedAndUnusedTest(UDSTestCase): userServices: list[models.UserService] - def setUp(self): + def setUp(self) -> None: config.GlobalConfig.CHECK_UNUSED_TIME.set('600') AssignedAndUnused.setup() # All created user services has "in_use" to False, os_state and state to USABLE self.userServices = fixtures_services.create_cache_testing_userservices(count=32) - def test_assigned_unused(self): + def test_assigned_unused(self) -> None: for us in self.userServices: # Update state date to now us.set_state(State.USABLE) # Set now, should not be removed diff --git a/server/tests/core/workers/test_hanged_userservice_cleaner.py b/server/tests/core/workers/test_hanged_userservice_cleaner.py index 8bcfeec08..8a4bb775f 100644 --- a/server/tests/core/workers/test_hanged_userservice_cleaner.py +++ b/server/tests/core/workers/test_hanged_userservice_cleaner.py @@ -50,7 +50,7 @@ TEST_SERVICES = 5 * 5 # Ensure multiple of 5 for testing class HangedCleanerTest(UDSTestCase): userServices: list[models.UserService] - def setUp(self): + def setUp(self) -> None: config.GlobalConfig.MAX_INITIALIZING_TIME.set(MAX_INIT) config.GlobalConfig.MAX_REMOVAL_TIME.set(MAX_INIT) HangedCleaner.setup() @@ -82,7 +82,7 @@ class HangedCleanerTest(UDSTestCase): ) us.save(update_fields=['state', 'os_state', 'state_date']) - def test_hanged_cleaner(self): + def test_hanged_cleaner(self) -> None: # At start, there is no "removable" user services cleaner = HangedCleaner(Environment.testing_environment()) cleaner.run() diff --git a/server/tests/core/workers/test_stuck_cleaner.py b/server/tests/core/workers/test_stuck_cleaner.py index 4ddf1828e..bb9f7feff 100644 --- a/server/tests/core/workers/test_stuck_cleaner.py +++ b/server/tests/core/workers/test_stuck_cleaner.py @@ -78,7 +78,7 @@ class StuckCleanerTest(UDSTestCase): us.save(update_fields=['state_date', 'state', 'os_state']) - def test_worker_outdated(self): + def test_worker_outdated(self) -> None: count = UserService.objects.count() cleaner = StuckCleaner(Environment.testing_environment()) cleaner.run() @@ -86,7 +86,7 @@ class StuckCleanerTest(UDSTestCase): UserService.objects.count(), count // 4 ) # 3/4 of user services should be removed - def test_worker_not_outdated(self): + def test_worker_not_outdated(self) -> None: # Fix state_date to be less than 1 day for all user services for us in self.userServices: us.state_date = datetime.datetime.now() - datetime.timedelta( diff --git a/server/tests/middleware/test_redirect.py b/server/tests/middleware/test_redirect.py index b7ca62c45..7fc6df137 100644 --- a/server/tests/middleware/test_redirect.py +++ b/server/tests/middleware/test_redirect.py @@ -44,7 +44,7 @@ class RedirectMiddlewareTest(test.UDSTransactionTestCase): """ Test client functionality """ - def test_redirect(self): + def test_redirect(self) -> None: RedirectMiddlewareTest.add_middleware('uds.middleware.redirect.RedirectMiddleware') page = 'https://testserver' + reverse('page.index') response = self.client.get('/', secure=False) diff --git a/server/tests/osmanagers/windows_randompass_osmanager copy/__init__.py b/server/tests/osmanagers/windows_randompass_osmanager/__init__.py similarity index 100% rename from server/tests/osmanagers/windows_randompass_osmanager copy/__init__.py rename to server/tests/osmanagers/windows_randompass_osmanager/__init__.py diff --git a/server/tests/osmanagers/windows_randompass_osmanager copy/test_serialization.py b/server/tests/osmanagers/windows_randompass_osmanager/test_serialization.py similarity index 100% rename from server/tests/osmanagers/windows_randompass_osmanager copy/test_serialization.py rename to server/tests/osmanagers/windows_randompass_osmanager/test_serialization.py diff --git a/server/tests/services/physical_machines/test_serialization_deployment.py b/server/tests/services/physical_machines/test_serialization_deployment.py index b8ca6e6cd..cfce7f7a7 100644 --- a/server/tests/services/physical_machines/test_serialization_deployment.py +++ b/server/tests/services/physical_machines/test_serialization_deployment.py @@ -24,7 +24,7 @@ from uds.services.PhysicalMachines import provider, deployment class IPMachineUserServiceSerializationTest(UDSTestCase): - def test_marshalling(self): + def test_marshalling(self) -> None: obj = deployment.OldIPSerialData() obj._ip = '1.1.1.1' obj._state = 'state' @@ -37,7 +37,7 @@ class IPMachineUserServiceSerializationTest(UDSTestCase): data = obj.marshal() - instance = deployment.IPMachineUserService(environment=Environment.testing_environment(), service=None) + instance = deployment.IPMachineUserService(environment=Environment.testing_environment(), service=None) # type: ignore # service is not used instance.unmarshal(data) marshaled_data = instance.marshal() @@ -53,7 +53,7 @@ class IPMachineUserServiceSerializationTest(UDSTestCase): _check_fields(instance) # Reunmarshall again and check that remarshalled flag is not set - instance = deployment.IPMachineUserService(environment=Environment.testing_environment(), service=None) + instance = deployment.IPMachineUserService(environment=Environment.testing_environment(), service=None) # type: ignore # service is not used instance.unmarshal(marshaled_data) self.assertFalse(instance.needs_upgrade()) diff --git a/server/tests/services/xen/test_serialization_deployment.py b/server/tests/services/xen/test_serialization_deployment.py index 579745b15..950a577d3 100644 --- a/server/tests/services/xen/test_serialization_deployment.py +++ b/server/tests/services/xen/test_serialization_deployment.py @@ -96,7 +96,7 @@ class XenDeploymentSerializationTest(UDSTransactionTestCase): environment = Environment.testing_environment() def _create_instance(unmarshal_data: 'bytes|None' = None) -> Deployment: - instance = Deployment(environment=environment, service=None) + instance = Deployment(environment=environment, service=None) # type: ignore # service is not used if unmarshal_data: instance.unmarshal(unmarshal_data) return instance @@ -127,7 +127,7 @@ class XenDeploymentSerializationTest(UDSTransactionTestCase): environment.storage.put_pickle('queue', TEST_QUEUE) def _create_instance(unmarshal_data: 'bytes|None' = None) -> Deployment: - instance = Deployment(environment=environment, service=None) + instance = Deployment(environment=environment, service=None) # type: ignore # service is not used if unmarshal_data: instance.unmarshal(unmarshal_data) return instance @@ -171,6 +171,6 @@ class XenDeploymentSerializationTest(UDSTransactionTestCase): # This test is designed to ensure that all fields are autoserializable # If some field is added or removed, this tests will warn us about it to fix the rest of the related tests with Environment.temporary_environment() as env: - instance = Deployment(environment=env, service=None) + instance = Deployment(environment=env, service=None) # type: ignore # service is not used self.assertSetEqual(set(f[0] for f in instance._autoserializable_fields()), EXPECTED_FIELDS) diff --git a/server/tests/utils/rest/__init__.py b/server/tests/utils/rest/__init__.py index d88aae1f4..6dceacadf 100644 --- a/server/tests/utils/rest/__init__.py +++ b/server/tests/utils/rest/__init__.py @@ -136,11 +136,11 @@ def random_value( class RestStruct: - def __init__(self, **kwargs) -> None: + def __init__(self, **kwargs: typing.Any) -> None: for k, v in kwargs.items(): setattr(self, k, v) - def as_dict(self, **kwargs) -> dict[str, typing.Any]: + def as_dict(self, **kwargs: typing.Any) -> dict[str, typing.Any]: # Use kwargs to override values res = {k: kwargs.get(k, getattr(self, k)) for k in self.__annotations__} # pylint: disable=no-member # Remove None values for optional fields @@ -158,7 +158,7 @@ class RestStruct: } @classmethod - def random_create(cls, **kwargs) -> 'RestStruct': + def random_create(cls, **kwargs: typing.Any) -> 'RestStruct': # Use kwargs to override values # Extract type from annotations return cls(**{k: random_value(v, kwargs.get(k, None)) for k, v in cls.__annotations__.items()}) diff --git a/server/tests/utils/rest/assertions.py b/server/tests/utils/rest/assertions.py index 934005ecc..8f17a26dc 100644 --- a/server/tests/utils/rest/assertions.py +++ b/server/tests/utils/rest/assertions.py @@ -44,8 +44,8 @@ logger = logging.getLogger(__name__) def assertUserIs( user: models.User, compare_to: collections.abc.Mapping[str, typing.Any], - compare_uuid=False, - compare_password=False, + compare_uuid: bool=False, + compare_password: bool=False, ) -> bool: ignore_fields = ['password', 'groups', 'mfa_data', 'last_access', 'role'] @@ -100,7 +100,7 @@ def assertUserIs( def assertGroupIs( - group: models.Group, compare_to: collections.abc.Mapping[str, typing.Any], compare_uuid=False + group: models.Group, compare_to: collections.abc.Mapping[str, typing.Any], compare_uuid: bool=False ) -> bool: ignore_fields = ['groups', 'users', 'is_meta', 'type', 'pools'] @@ -143,7 +143,7 @@ def assertGroupIs( def assertServicePoolIs( pool: models.ServicePool, compare_to: collections.abc.Mapping[str, typing.Any], - compare_uuid=False, + compare_uuid: bool=False, ) -> bool: ignore_fields = [ 'tags', diff --git a/server/tests/utils/test.py b/server/tests/utils/test.py index 117138da3..9df54e6c9 100644 --- a/server/tests/utils/test.py +++ b/server/tests/utils/test.py @@ -26,8 +26,9 @@ # OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE # OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE. """ -@author: Adolfo Gómez, dkmaster at dkmon dot com +Author: Adolfo Gómez, dkmaster at dkmon dot com """ +# pyright: reportUnknownMemberType=false import typing import collections.abc import logging @@ -50,11 +51,12 @@ class UDSHttpResponse(HttpResponse): """ Custom response class to be able to access the response content """ + url: str def __init__(self, content: bytes, *args: typing.Any, **kwargs: typing.Any) -> None: super().__init__(content, *args, **kwargs) - + def json(self) -> typing.Any: return super().json() @@ -100,7 +102,7 @@ class UDSClientMixin: kwargs['REMOTE_ADDR'] = '127.0.0.1' elif self.ip_version == 6: kwargs['REMOTE_ADDR'] = '::1' - + kwargs['headers'] = self.uds_headers def compose_rest_url(self, method: str) -> str: @@ -164,7 +166,7 @@ class UDSClient(UDSClientMixin, Client): return self.delete(self.compose_rest_url(method), *args, **kwargs) -class UDSAsyncClient(UDSClientMixin, AsyncClient): +class UDSAsyncClient(UDSClientMixin, AsyncClient): # type: ignore # Django stubs do not include AsyncClient def __init__( self, enforce_csrf_checks: bool = False, @@ -173,7 +175,7 @@ class UDSAsyncClient(UDSClientMixin, AsyncClient): ): # Instantiate the client and add basic user agent - AsyncClient.__init__(self, enforce_csrf_checks, raise_request_exception) + AsyncClient.__init__(self, enforce_csrf_checks, raise_request_exception) # pyright: ignore UDSClientMixin.initialize(self) # and required UDS cookie @@ -184,7 +186,7 @@ class UDSAsyncClient(UDSClientMixin, AsyncClient): request = request.copy() # Add headers request.update(self.uds_headers) - return await super().request(**request) + return await super().request(**request) # pyright: ignore # pylint: disable=invalid-overridden-method async def get(self, *args: typing.Any, **kwargs: typing.Any) -> 'UDSHttpResponse': @@ -244,7 +246,7 @@ class UDSTestCaseMixin: pass # Not present -class UDSTestCase(UDSTestCaseMixin, TestCase): +class UDSTestCase(UDSTestCaseMixin, TestCase): # pyright: ignore # Overrides superclass client @classmethod def setUpClass(cls) -> None: super().setUpClass() @@ -253,7 +255,8 @@ class UDSTestCase(UDSTestCaseMixin, TestCase): def create_environment(self) -> Environment: return Environment.testing_environment() -class UDSTransactionTestCase(UDSTestCaseMixin, TransactionTestCase): + +class UDSTransactionTestCase(UDSTestCaseMixin, TransactionTestCase): # pyright: ignore # superclass client @classmethod def setUpClass(cls) -> None: super().setUpClass() diff --git a/server/tests/utils/web/__init__.py b/server/tests/utils/web/__init__.py index c78e3505b..649b07ff4 100644 --- a/server/tests/utils/web/__init__.py +++ b/server/tests/utils/web/__init__.py @@ -77,7 +77,7 @@ def logout(caller: SimpleTestCase, client: Client, auth_token: str) -> None: response = client.get( '/uds/rest/auth/logout', content_type='application/json', - **{consts.auth.AUTH_TOKEN_HEADER: auth_token} # type: ignore + **{consts.auth.AUTH_TOKEN_HEADER: auth_token} # pyright: ignore ) caller.assertEqual(response.status_code, 200, 'Logout') caller.assertEqual(response.json(), {'result': 'ok'}, 'Logout') diff --git a/server/tests/web/user/test_login_logout.py b/server/tests/web/user/test_login_logout.py index 42f77a936..209473b6c 100644 --- a/server/tests/web/user/test_login_logout.py +++ b/server/tests/web/user/test_login_logout.py @@ -56,7 +56,7 @@ class WebLoginLogoutTest(test.WEBTestCase): response = typing.cast('HttpResponse', self.client.get('/uds/utility/uds.js')) self.assertContains(response, '"errors": ["Access denied"]', status_code=200) - def test_login_logout_success(self): + def test_login_logout_success(self) -> None: """ Test login and logout """ @@ -117,7 +117,7 @@ class WebLoginLogoutTest(test.WEBTestCase): response = self.do_login('invalid', rootpass, auth.uuid) self.assertInvalidLogin(response) - def test_login_valid_user_no_group(self): + def test_login_valid_user_no_group(self) -> None: user = fixtures_authenticators.create_users( fixtures_authenticators.create_authenticator(), )[0] @@ -147,7 +147,7 @@ class WebLoginLogoutTest(test.WEBTestCase): self.assertEqual(models.Log.objects.count(), 12) - def test_login_invalid_user(self): + def test_login_invalid_user(self) -> None: user = fixtures_authenticators.create_users( fixtures_authenticators.create_authenticator(), )[0]