1
0
mirror of https://github.com/dkmstr/openuds.git synced 2025-02-02 09:47:13 +03:00

Fixed some typos left behind on last refactorization of code

This commit is contained in:
Adolfo Gómez García 2023-12-03 23:29:22 +01:00
parent 58a50c6983
commit 3e0099ed16
No known key found for this signature in database
GPG Key ID: DD1ABF20724CDA23
5 changed files with 35 additions and 34 deletions

View File

@ -49,7 +49,7 @@ from django.urls import reverse
from django.utils.translation import gettext as _
from uds.core import auths, types, exceptions
from uds.core import auths, types, exceptions, consts
from uds.core.types.request import ExtendedHttpRequest
from uds.core.util import log
from uds.core.util import net
@ -131,8 +131,9 @@ def getRootUser() -> models.User:
user.manager = models.Authenticator() # type: ignore
# Fake overwrite some methods, a bit cheating? maybe? :)
user.getGroups = lambda: [] # type: ignore
user.updateLastAccess = lambda: None # type: ignore
user.logout = lambda x: SUCCESS_AUTH # type: ignore
user.updateLastAccess = lambda: None
# Override logout method to do nothing for this user
user.logout = lambda request: types.auth.SUCCESS_AUTH
return user

View File

@ -57,6 +57,7 @@ import struct
from cryptography import fernet
from django.conf import settings
from requests import get
# pylint: disable=too-few-public-methods
@ -85,6 +86,7 @@ CRC_SIZE: typing.Final[int] = 4
# Packing data struct
pack_struct = struct.Struct('<HHI')
# Helper functions
def fernet_key(crypt_key: bytes) -> str:
"""Generate key from password and seed
@ -97,6 +99,7 @@ def fernet_key(crypt_key: bytes) -> str:
# Generate an URL-Safe base64 encoded 32 bytes key for Fernet
return base64.b64encode(hashlib.sha256(crypt_key).digest()).decode()
# pylint: disable=unnecessary-dunder-call
class _SerializableField(typing.Generic[T]):
name: str
@ -129,13 +132,18 @@ class _SerializableField(typing.Generic[T]):
return getattr(instance, '_fields').get(self.name, self._default())
if self.default is None:
raise AttributeError(f"Field {self.name} is not set")
return self._default()
# Set default using setter
self.__set__(instance, self._default())
return getattr(instance, '_fields')[self.name]
def __set__(self, instance: 'AutoSerializable', value: T) -> None:
# If type is float and value is int, convert it
# Or if type is int and value is float, convert it
if self.type in (float, int) and isinstance(value, (float, int)):
value = self.type(value)
if not isinstance(value, self.type):
raise TypeError(
f"Field {self.name} cannot be set to {value} (type {self.type.__name__})"
)
# Allow int to float conversion and viceversa
raise TypeError(f"Field {self.name} cannot be set to {value} (type {self.type.__name__})")
if not hasattr(instance, '_fields'):
setattr(instance, '_fields', {})
getattr(instance, '_fields')[self.name] = value
@ -177,7 +185,7 @@ class _SerializableField(typing.Generic[T]):
# Integer field
class IntField(_SerializableField[int]):
class IntegerField(_SerializableField[int]):
def __init__(self, default: int = 0):
super().__init__(int, default)
@ -191,6 +199,7 @@ class FloatField(_SerializableField[float]):
def __init__(self, default: float = 0.0):
super().__init__(float, default)
class BoolField(_SerializableField[bool]):
def __init__(self, default: bool = False):
super().__init__(bool, default)
@ -201,6 +210,7 @@ class BoolField(_SerializableField[bool]):
def unmarshal(self, instance: 'AutoSerializable', data: bytes) -> None:
self.__set__(instance, data == b'1')
class ListField(_SerializableField[typing.List]):
"""List field
@ -210,9 +220,7 @@ class ListField(_SerializableField[typing.List]):
def __init__(
self,
default: typing.Union[
typing.List, typing.Callable[[], typing.List]
] = lambda: [],
default: typing.Union[typing.List, typing.Callable[[], typing.List]] = lambda: [],
):
super().__init__(list, default)
@ -232,9 +240,7 @@ class DictField(_SerializableField[typing.Dict]):
def __init__(
self,
default: typing.Union[
typing.Dict, typing.Callable[[], typing.Dict]
] = lambda: {},
default: typing.Union[typing.Dict, typing.Callable[[], typing.Dict]] = lambda: {},
):
super().__init__(dict, default)
@ -256,7 +262,7 @@ class PasswordField(StringField):
def __init__(self, default: str = '', crypt_key: str = ''):
super().__init__(default)
self._crypt_key = crypt_key
self._crypt_key = crypt_key or settings.SECRET_KEY[:32]
def _encrypt(self, value: str) -> bytes:
"""Encrypt a password
@ -321,7 +327,7 @@ class AutoSerializable(metaclass=_FieldNameSetter):
Example:
>>> class Test(SerializableFields):
... a = IntField()
... a = IntegerField()
... b = StrField()
... c = FloatField()
... d = ListField(defalut=lambda: [1, 2, 3])
@ -398,9 +404,7 @@ class AutoSerializable(metaclass=_FieldNameSetter):
header = data[: len(HEADER_BASE) + CRC_SIZE]
# Extract checksum
checksum = int.from_bytes(
header[len(HEADER_BASE) : len(HEADER_BASE) + 4], 'big'
)
checksum = int.from_bytes(header[len(HEADER_BASE) : len(HEADER_BASE) + 4], 'big')
# Unprocess data
data = self.unprocess_data(header, data[len(header) :])
@ -417,14 +421,7 @@ class AutoSerializable(metaclass=_FieldNameSetter):
name, type_name, value = (
data[8 : 8 + name_len].decode(),
data[8 + name_len : 8 + name_len + type_name_len].decode(),
data[
8
+ name_len
+ type_name_len : 8
+ name_len
+ type_name_len
+ data_len
],
data[8 + name_len + type_name_len : 8 + name_len + type_name_len + data_len],
)
# Add to fields
fields[name] = (type_name, value)
@ -441,7 +438,9 @@ class AutoSerializable(metaclass=_FieldNameSetter):
else:
logger.warning(
'Field %s has wrong type in unmarshalled data (should be %s and is %s',
v.name, fields[v.name][0], v.__class__.__name__,
v.name,
fields[v.name][0],
v.__class__.__name__,
)
def __eq__(self, other: typing.Any) -> bool:

View File

@ -117,7 +117,7 @@ class ServerEventsPingTest(rest.test.RESTTestCase):
# Ensure stat is valid right now
statsResponse = types.servers.ServerStats.fromDict(server_stats)
self.assertTrue(statsResponse.is_valid)
statsResponse = types.servers.ServerStats.fromDict(server_stats, stamp=getSqlStamp() - consts.DEFAULT_CACHE_TIMEOUT - 1)
statsResponse = types.servers.ServerStats.fromDict(server_stats, stamp=getSqlStamp() - consts.system.DEFAULT_CACHE_TIMEOUT - 1)
self.assertFalse(statsResponse.is_valid)
def test_event_ping_without_stats(self) -> None:

View File

@ -42,7 +42,7 @@ UNICODE_CHARS_2 = 'ñöçóá^(€íöè)'
class AutoSerializableClass(auto_serializable.AutoSerializable):
int_field = auto_serializable.IntField()
int_field = auto_serializable.IntegerField()
str_field = auto_serializable.StringField()
float_field = auto_serializable.FloatField()
bool_field = auto_serializable.BoolField()
@ -52,7 +52,7 @@ class AutoSerializableClass(auto_serializable.AutoSerializable):
class AutoSerializableCompressedClass(auto_serializable.AutoSerializableCompressed):
int_field = auto_serializable.IntField()
int_field = auto_serializable.IntegerField()
str_field = auto_serializable.StringField()
float_field = auto_serializable.FloatField()
bool_field = auto_serializable.BoolField()
@ -62,7 +62,7 @@ class AutoSerializableCompressedClass(auto_serializable.AutoSerializableCompress
class AutoSerializableEncryptedClass(auto_serializable.AutoSerializableEncrypted):
int_field = auto_serializable.IntField()
int_field = auto_serializable.IntegerField()
str_field = auto_serializable.StringField()
float_field = auto_serializable.FloatField()
bool_field = auto_serializable.BoolField()

View File

@ -177,6 +177,7 @@ class ServiceCacheUpdaterTest(UDSTestCase):
# Delete all userServices
self.servicePool.userServices.all().delete()
# Now, set provider limit to 0. Minumum aceptable is 1, so 1 will be created
# We again allow masUserServices to be zero (meaning that no service will be created)
# This allows us to "honor" some external providers that, in some cases, will not have services available...
TestServiceCache.maxUserServices = 0
self.assertEqual(self.runCacheUpdater(self.servicePool.cache_l1_srvs + 10), 1)
self.assertEqual(self.runCacheUpdater(self.servicePool.cache_l1_srvs + 10), 0)