1
0
mirror of https://github.com/dkmstr/openuds.git synced 2025-01-10 01:17:59 +03:00

Son minimal cosmetic refactorizations

This commit is contained in:
Adolfo Gómez García 2024-06-08 06:08:33 +02:00
parent 7b7e9193c0
commit 26f48d22f8
No known key found for this signature in database
GPG Key ID: DD1ABF20724CDA23
18 changed files with 83 additions and 69 deletions

View File

@ -78,10 +78,10 @@ class GuiTest(UDSTestCase):
# 1. Empty list
# 2.- single string
# 3.- A list of strings
self.assertEqual(ensure.is_list([]), [])
self.assertEqual(ensure.is_list('aaaa'), ['aaaa'])
self.assertEqual(ensure.is_list(['a', 'b']), ['a', 'b'])
self.assertEqual(ensure.is_list(1), [1])
self.assertEqual(ensure.as_list([]), [])
self.assertEqual(ensure.as_list('aaaa'), ['aaaa'])
self.assertEqual(ensure.as_list(['a', 'b']), ['a', 'b'])
self.assertEqual(ensure.as_list(1), [1])
def test_choice_image(self) -> None:
# id, text, and base64 image

View File

@ -42,31 +42,31 @@ logger = logging.getLogger(__name__)
class EnsureTest(UDSTestCase):
def test_ensure_list(self) -> None:
self.assertEqual(ensure.is_list([]), [])
self.assertEqual(ensure.is_list([1, 2, 3]), [1, 2, 3])
self.assertEqual(ensure.is_list((1, 2, 3)), [1, 2, 3])
self.assertEqual(ensure.is_list(1), [1])
self.assertEqual(ensure.is_list('111'), ['111'])
self.assertEqual(ensure.is_list(None), [])
self.assertEqual(ensure.is_list({}), [])
self.assertEqual(ensure.is_list({1, 2, 3}), [1, 2, 3])
self.assertEqual(ensure.as_list([]), [])
self.assertEqual(ensure.as_list([1, 2, 3]), [1, 2, 3])
self.assertEqual(ensure.as_list((1, 2, 3)), [1, 2, 3])
self.assertEqual(ensure.as_list(1), [1])
self.assertEqual(ensure.as_list('111'), ['111'])
self.assertEqual(ensure.as_list(None), [])
self.assertEqual(ensure.as_list({}), [])
self.assertEqual(ensure.as_list({1, 2, 3}), [1, 2, 3])
def test_ensure_iterable(self) -> None:
self.assertEqual(list(ensure.is_iterable([])), [])
self.assertIsInstance(ensure.is_iterable([]), typing.Iterator)
self.assertEqual(list(ensure.is_iterable([1, 2, 3])), [1, 2, 3])
self.assertIsInstance(ensure.is_iterable([1, 2, 3]), typing.Iterator)
self.assertEqual(list(ensure.is_iterable((1, 2, 3))), [1, 2, 3])
self.assertIsInstance(ensure.is_iterable((1, 2, 3)), typing.Iterator)
self.assertEqual(list(ensure.is_iterable(1)), [1])
self.assertIsInstance(ensure.is_iterable(1), typing.Iterator)
self.assertEqual(list(ensure.is_iterable('111')), ['111'])
self.assertIsInstance(ensure.is_iterable('111'), typing.Iterator)
self.assertEqual(list(ensure.is_iterable(None)), [])
self.assertIsInstance(ensure.is_iterable(None), typing.Iterator)
self.assertEqual(list(ensure.is_iterable({})), [])
self.assertIsInstance(ensure.is_iterable({}), typing.Iterator)
self.assertEqual(list(ensure.is_iterable({1, 2, 3})), [1, 2, 3])
self.assertIsInstance(ensure.is_iterable({1, 2, 3}), typing.Iterator)
self.assertEqual(list(ensure.as_iterable([])), [])
self.assertIsInstance(ensure.as_iterable([]), typing.Iterator)
self.assertEqual(list(ensure.as_iterable([1, 2, 3])), [1, 2, 3])
self.assertIsInstance(ensure.as_iterable([1, 2, 3]), typing.Iterator)
self.assertEqual(list(ensure.as_iterable((1, 2, 3))), [1, 2, 3])
self.assertIsInstance(ensure.as_iterable((1, 2, 3)), typing.Iterator)
self.assertEqual(list(ensure.as_iterable(1)), [1])
self.assertIsInstance(ensure.as_iterable(1), typing.Iterator)
self.assertEqual(list(ensure.as_iterable('111')), ['111'])
self.assertIsInstance(ensure.as_iterable('111'), typing.Iterator)
self.assertEqual(list(ensure.as_iterable(None)), [])
self.assertIsInstance(ensure.as_iterable(None), typing.Iterator)
self.assertEqual(list(ensure.as_iterable({})), [])
self.assertIsInstance(ensure.as_iterable({}), typing.Iterator)
self.assertEqual(list(ensure.as_iterable({1, 2, 3})), [1, 2, 3])
self.assertIsInstance(ensure.as_iterable({1, 2, 3}), typing.Iterator)

View File

@ -45,7 +45,6 @@ def random_string(size: int = 6, chars: typing.Optional[str] = None) -> str:
random.choice(chars) for _ in range(size) # nosec: Not used for cryptography, just for testing
)
def random_utf8_string(size: int = 6) -> str:
# Generate a random utf-8 string of length "length"
# some utf-8 non ascii chars are generated, but not all of them
@ -66,8 +65,18 @@ def random_ip() -> str:
)
def random_mac() -> str:
return ':'.join(random_string(2, '0123456789ABCDEF') for _ in range(6))
def random_mac(mac_range: typing.Optional[str] = None) -> str:
if not mac_range:
return ':'.join(random_string(2, '0123456789ABCDEF') for _ in range(6))
else: # Mac range is like 00:15:5D:10:00:00-00:15:5D:FF:FF:FF
start, end = mac_range.split('-')
# Convert to integers
start = start.split(':')
end = end.split(':')
start_n = int(''.join(start), 16)
end_n = int(''.join(end), 16)
mac = random.randint(start_n, end_n)
return ':'.join(f'{mac:012X}'[i:i + 2] for i in range(0, 12, 2))
def limited_iterator(

View File

@ -174,7 +174,7 @@ class Tickets(Handler):
# Some machines needs password, depending on configuration
groupIds: list[str] = []
for groupName in ensure.is_list(self.get_param('groups')):
for groupName in ensure.as_list(self.get_param('groups')):
try:
groupIds.append(auth.groups.get(name=groupName).uuid or '')
except Exception:

View File

@ -497,7 +497,7 @@ class RegexLdap(auths.Authenticator):
try:
if (
len(
ensure.is_list(
ensure.as_list(
con.search_ext_s( # pyright: ignore reportUnknownMemberType
base=self.ldap_base.as_str(),
scope=ldaputil.SCOPE_SUBTREE, # pyright: ignore reportUnknownMemberType
@ -520,7 +520,7 @@ class RegexLdap(auths.Authenticator):
try:
if (
len(
ensure.is_list(
ensure.as_list(
con.search_ext_s( # pyright: ignore reportUnknownMemberType
base=self.ldap_base.as_str(),
scope=ldaputil.SCOPE_SUBTREE, # pyright: ignore reportUnknownMemberType
@ -546,7 +546,7 @@ class RegexLdap(auths.Authenticator):
try:
if (
len(
ensure.is_list(
ensure.as_list(
con.search_ext_s( # pyright: ignore reportUnknownMemberType
base=self.ldap_base.as_str(),
scope=ldaputil.SCOPE_SUBTREE, # pyright: ignore reportUnknownMemberType

View File

@ -507,7 +507,7 @@ class SimpleLDAPAuthenticator(auths.Authenticator):
try:
if (
len(
ensure.is_list(
ensure.as_list(
con.search_ext_s( # pyright: ignore reportGeneralTypeIssues
base=self.ldap_base.as_str(),
scope=ldaputil.SCOPE_SUBTREE, # pyright: ignore reportGeneralTypeIssues
@ -522,7 +522,7 @@ class SimpleLDAPAuthenticator(auths.Authenticator):
if (
len(
ensure.is_list(
ensure.as_list(
con.search_ext_s( # pyright: ignore reportGeneralTypeIssues
base=self.ldap_base.as_str(),
scope=ldaputil.SCOPE_SUBTREE, # pyright: ignore reportGeneralTypeIssues
@ -537,7 +537,7 @@ class SimpleLDAPAuthenticator(auths.Authenticator):
if (
len(
ensure.is_list(
ensure.as_list(
con.search_ext_s( # pyright: ignore reportGeneralTypeIssues
base=self.ldap_base.as_str(),
scope=ldaputil.SCOPE_SUBTREE, # pyright: ignore reportGeneralTypeIssues
@ -554,7 +554,7 @@ class SimpleLDAPAuthenticator(auths.Authenticator):
if (
len(
ensure.is_list(
ensure.as_list(
con.search_ext_s( # pyright: ignore reportGeneralTypeIssues
base=self.ldap_base.as_str(),
scope=ldaputil.SCOPE_SUBTREE, # pyright: ignore reportGeneralTypeIssues
@ -571,7 +571,7 @@ class SimpleLDAPAuthenticator(auths.Authenticator):
if (
len(
ensure.is_list(
ensure.as_list(
con.search_ext_s( # pyright: ignore reportGeneralTypeIssues
base=self.ldap_base.as_str(),
scope=ldaputil.SCOPE_SUBTREE, # pyright: ignore reportGeneralTypeIssues
@ -588,7 +588,7 @@ class SimpleLDAPAuthenticator(auths.Authenticator):
)
)
res = ensure.is_list(
res = ensure.as_list(
con.search_ext_s( # pyright: ignore reportGeneralTypeIssues
base=self.ldap_base.as_str(),
scope=ldaputil.SCOPE_SUBTREE, # pyright: ignore reportGeneralTypeIssues

View File

@ -175,7 +175,7 @@ class ServerManager(metaclass=singleton.Singleton):
self.increment_unmanaged_usage(best_with_counter[0].uuid)
best = (
best_with_counter[0],
types.servers.ServerStats.empty(),
types.servers.ServerStats.null(),
)
# If best was locked, notify it (will be notified again on assign)

View File

@ -243,7 +243,7 @@ class ServerStats:
}
@staticmethod
def empty() -> 'ServerStats':
def null() -> 'ServerStats':
return ServerStats()
def __str__(self) -> str:
@ -261,7 +261,7 @@ class ServerCounter(typing.NamedTuple):
if data is None:
return None
return ServerCounter(*ensure.is_iterable(data))
return ServerCounter(*ensure.as_iterable(data))
@staticmethod
def null() -> 'ServerCounter':

View File

@ -1552,7 +1552,7 @@ class UserInterface(metaclass=UserInterfaceType):
fields: gui.ValuesDictType = {}
for fld, fld_gui in self._gui.items():
if fld_gui.is_type(types.ui.FieldType.EDITABLELIST, types.ui.FieldType.MULTICHOICE):
fields[fld] = ensure.is_list(fld_gui.value)
fields[fld] = ensure.as_list(fld_gui.value)
else:
fields[fld] = fld_gui.value
logger.debug('Values Dict: %s', fields)

View File

@ -104,7 +104,7 @@ def process_regex_field(
if '+' in attr_name:
attrs_list = attr_name.split('+')
# Check all attributes are present, and has only one value
attrs_values = [ensure.is_list(attributes.get(a, [''])) for a in attrs_list]
attrs_values = [ensure.as_list(attributes.get(a, [''])) for a in attrs_list]
if not all([len(v) <= 1 for v in attrs_values]):
logger.warning(
'Attribute %s do not has exactly one value, skipping %s', attr_name, line
@ -115,9 +115,9 @@ def process_regex_field(
elif '**' in attr_name:
# Prepend the value after : to attribute value before :
attr, prependable = attr_name.split('**')
val = [prependable + a for a in ensure.is_list(attributes.get(attr, []))]
val = [prependable + a for a in ensure.as_list(attributes.get(attr, []))]
else:
val = ensure.is_list(attributes.get(attr_name, []))
val = ensure.as_list(attributes.get(attr_name, []))
return val
except Exception as e:
logger.warning('Error processing attribute %s (%s): %s', attr_name, attributes, e)

View File

@ -32,7 +32,7 @@ import typing
import collections.abc
def is_list(obj: typing.Any) -> list[typing.Any]:
def as_list(obj: typing.Any) -> list[typing.Any]:
"""
Checks if the given object is a list or can be converted into a list.
@ -40,24 +40,27 @@ def is_list(obj: typing.Any) -> list[typing.Any]:
obj (Any): The object to check.
Returns:
list[Any]: If the object is a list, it is returned as is.
If the object is a string or bytes, it is wrapped in a list and returned.
list[Any]: If the object is a list, it is returned as is.
If the object is a string or bytes, it is wrapped in a list and returned.
If the object is not iterable, a list containing the object is returned.
"""
if not obj:
if obj is None:
return []
if isinstance(obj, (str, bytes)):
if isinstance(obj, (str, bytes, dict)):
return [obj]
if isinstance(obj, list):
return typing.cast(list[typing.Any], obj)
try:
return list(obj)
except Exception: # Not iterable (list will fail)
return [obj]
def is_iterable(obj: typing.Any) -> typing.Generator[typing.Any, None, None]:
def as_iterable(obj: typing.Any) -> typing.Generator[typing.Any, None, None]:
"""Returns an iterable object from a single object or a list of objects
Args:
@ -72,7 +75,7 @@ def is_iterable(obj: typing.Any) -> typing.Generator[typing.Any, None, None]:
if not obj:
return
if isinstance(obj, (str, bytes)):
if isinstance(obj, (str, bytes, dict)):
yield obj
else:
if isinstance(obj, collections.abc.Iterable):

View File

@ -83,30 +83,30 @@ def detect_os(
break
else:
# Try to detect browser from User-Agent
match = None
found = None
browser_type = None
for browser_type, rules in consts.os.BROWSER_RULES.items():
must, must_not = rules
for must_re in consts.os.BROWSERS_RE[must]:
match = must_re.search(ua)
if match is None:
found = must_re.search(ua)
if found is None:
continue
# Check against no maching rules
for mustNotREs in must_not:
for cre in consts.os.BROWSERS_RE[mustNotREs]:
if cre.search(ua) is not None:
match = None
found = None
break
if match is None:
if found is None:
break
if match is not None:
if found is not None:
break
if match is not None:
if found is not None:
break
if match is not None:
if found is not None:
res.browser = browser_type or types.os.KnownBrowser.OTHER
res.version = '0.0'

View File

@ -213,7 +213,7 @@ def validate_path(
return path
def validate_port(port: typing.Union[str, int], field_name: typing.Optional[str] = None) -> int:
def validate_port(port: typing.Union[str, int], *, field_name: typing.Optional[str] = None, valid_default: typing.Optional[int] = None) -> int:
"""
Validates that a port number is valid
@ -226,6 +226,8 @@ def validate_port(port: typing.Union[str, int], field_name: typing.Optional[str]
Raises:
exceptions.ValidationException: if port is not valid
"""
if valid_default is not None and port == valid_default:
return valid_default
return validate_numeric(port, min_value=1, max_value=65535, field_name=field_name or 'Port')

View File

@ -72,7 +72,7 @@ def checkResult(lst: typing.Any) -> tuple[collections.abc.Mapping[str, typing.An
return xml2dict.parse(checkResultRaw(lst)), lst[1]
as_iterable = ensure.is_iterable
as_iterable = ensure.as_iterable
class OpenNebulaClient: # pylint: disable=too-many-public-methods

View File

@ -332,7 +332,7 @@ class ProxmoxClient:
minMemory (int, optional): Minimum memory required. Defaults to 0.
mustHaveVGPUS (typing.Optional[bool], optional): If the node must have VGPUS. True, False or None (don't care). Defaults to None.
'''
best = types.NodeStats.empty()
best = types.NodeStats.null()
node: types.NodeStats
# Function to calculate the weight of a node

View File

@ -84,7 +84,7 @@ class NodeStats(typing.NamedTuple):
return _from_dict(NodeStats, dictionary)
@staticmethod
def empty() -> 'NodeStats':
def null() -> 'NodeStats':
return NodeStats(
name='',
status='offline',

View File

@ -631,7 +631,7 @@ class XenClient: # pylint: disable=too-many-public-methods
snapshots = self.VM.get_snapshots(vm_opaque_ref)
if not full_info:
return [xen_types.VMInfo.empty(snapshot) for snapshot in snapshots]
return [xen_types.VMInfo.null(snapshot) for snapshot in snapshots]
# Return full info, thatis, name, id and snapshot_time
return sorted(

View File

@ -379,7 +379,7 @@ class VMInfo:
)
@staticmethod
def empty(opaque_ref: str) -> 'VMInfo':
def null(opaque_ref: str) -> 'VMInfo':
return VMInfo(
opaque_ref=opaque_ref,
uuid='',