1
0
mirror of https://github.com/dkmstr/openuds.git synced 2025-01-08 21:18:00 +03:00

Finised field upgrade mechaninsm

This commit is contained in:
Adolfo Gómez García 2024-01-25 03:37:57 +01:00
parent d2c466c542
commit f034f5eb7f
No known key found for this signature in database
GPG Key ID: DD1ABF20724CDA23
6 changed files with 81 additions and 59 deletions

View File

@ -172,7 +172,7 @@ class Authenticator(Module):
def __init__(
self,
environment: 'Environment',
values: typing.Optional[dict[str, str]],
values: typing.Optional[dict[str, str]] = None,
uuid: typing.Optional[str] = None,
):
"""

View File

@ -44,7 +44,7 @@ class Serializable:
- Read values from seralized data
"""
_flag_for_remarshal: bool
_needs_upgrade: bool
# Note:
# We can include a "data" member variable in the class
@ -52,7 +52,7 @@ class Serializable:
# on marshal and unmarshal methods
def __init__(self):
self._flag_for_remarshal = False
self._needs_upgrade = False
def marshal(self) -> bytes:
"""
@ -111,7 +111,7 @@ class Serializable:
# For remarshalling purposes
# These allows us to faster migration of old data formats to new ones
# alowing us to remove old format support as soon as possible
def flag_for_remarshalling(self, value: bool = True) -> None:
def flag_for_upgrade(self, value: bool = True) -> None:
"""
Flags this object for remarshalling
@ -123,10 +123,10 @@ class Serializable:
will not be remarshalled if not appropriate (basically, it's remarshalled on
get_instance unserialize method call)
"""
self._flag_for_remarshal = value
self._needs_upgrade = value
def needs_remarshalling(self) -> bool:
def needs_upgrade(self) -> bool:
"""
Returns true if this object needs remarshalling
"""
return self._flag_for_remarshal
return self._needs_upgrade

View File

@ -534,7 +534,7 @@ class gui:
validators.validate_hostname(self.value)
elif pattern == types.ui.FieldPatternType.HOST:
try:
validators.validate_hostname(self.value, allowDomain=True)
validators.validate_hostname(self.value, domain_allowed=True)
except exceptions.ui.ValidationError:
validators.validate_ip(self.value)
elif pattern == types.ui.FieldPatternType.PATH:

View File

@ -350,3 +350,15 @@ def macs_range_field(
tab=tab or types.ui.Tab.ADVANCED,
old_field_name='macsRange',
)
def mfa_attr_field(order: int = 20, tab: 'types.ui.Tab|str|None' = None) -> ui.gui.TextField:
return ui.gui.TextField(
length=2048,
lines=2,
label=_('MFA attribute'),
order=order,
tooltip=_('Attribute from where to extract the MFA code'),
required=False,
tab=tab or types.ui.Tab.MFA,
old_field_name='mfaAttr',
)

View File

@ -53,7 +53,7 @@ def validate_numeric(
value: typing.Union[str, int],
min_value: typing.Optional[int] = None,
max_value: typing.Optional[int] = None,
fieldName: typing.Optional[str] = None,
field_name: typing.Optional[str] = None,
) -> int:
"""
Validates that a numeric value is valid
@ -65,55 +65,57 @@ def validate_numeric(
:return: Raises exceptions.Validation exception if is invalid, else return the value "fixed"
"""
value = str(value).replace(' ', '')
fieldName = fieldName or _('Numeric')
field_name = field_name or _('Numeric')
try:
numeric = int(value)
if min_value is not None and numeric < min_value:
raise exceptions.ui.ValidationError(
_('{0} must be greater than or equal to {1}').format(fieldName, min_value)
_('{0} must be greater than or equal to {1}').format(field_name, min_value)
)
if max_value is not None and numeric > max_value:
raise exceptions.ui.ValidationError(
_('{0} must be lower than or equal to {1}').format(fieldName, max_value)
_('{0} must be lower than or equal to {1}').format(field_name, max_value)
)
value = str(numeric)
except ValueError:
raise exceptions.ui.ValidationError(_('{0} contains invalid characters').format(fieldName)) from None
raise exceptions.ui.ValidationError(_('{0} contains invalid characters').format(field_name)) from None
return int(value)
def validate_hostname(hostname: str, maxLength: int = 64, allowDomain=False) -> str:
if len(hostname) > maxLength:
def validate_hostname(hostname: str, max_length: int = 64, domain_allowed=False, field_name: typing.Optional[str] = None) -> str:
field_name = f' (On field {field_name})' if field_name else ''
if len(hostname) > max_length:
raise exceptions.ui.ValidationError(
_('{} is not a valid hostname: maximum host name length exceeded.').format(hostname)
_('{} is not a valid hostname: maximum host name length exceeded.').format(hostname + field_name)
)
if not allowDomain:
if not domain_allowed:
if '.' in hostname:
raise exceptions.ui.ValidationError(
_('{} is not a valid hostname: (domains not allowed)').format(hostname)
_('{} is not a valid hostname: (domains not allowed)').format(hostname + field_name)
)
allowed = re.compile(r'(?!-)[A-Z\d-]{1,63}(?<!-)$', re.IGNORECASE)
if not all(allowed.match(x) for x in hostname.split(".")):
raise exceptions.ui.ValidationError(_('{} is not a valid hostname: (invalid characters)').format(hostname))
raise exceptions.ui.ValidationError(_('{} is not a valid hostname: (invalid characters)').format(hostname + field_name))
return hostname
def validate_fqdn(fqdn: str, maxLength: int = 255) -> str:
return validate_hostname(fqdn, maxLength, allowDomain=True)
def validate_fqdn(fqdn: str, maxLength: int = 255, field_name: typing.Optional[str] = None) -> str:
return validate_hostname(fqdn, maxLength, domain_allowed=True, field_name=field_name)
def validateUrl(url: str, maxLength: int = 1024) -> str:
def validateUrl(url: str, maxLength: int = 1024, field_name: typing.Optional[str] = None) -> str:
field_name = f' (On field {field_name})' if field_name else ''
if len(url) > maxLength:
raise exceptions.ui.ValidationError(_('{} is not a valid URL: exceeds maximum length.').format(url))
raise exceptions.ui.ValidationError(_('{} is not a valid URL: exceeds maximum length.').format(url + field_name))
try:
url_validator(url)
@ -123,64 +125,68 @@ def validateUrl(url: str, maxLength: int = 1024) -> str:
return url
def validate_ipv4(ipv4: str) -> str:
def validate_ipv4(ipv4: str, field_name: typing.Optional[str] = None) -> str:
"""
Validates that a ipv4 address is valid
:param ipv4: ipv4 address to validate
:param returnAsInteger: if True, returns value as integer, if not, as string
:return: Raises exceptions.Validation exception if is invalid, else return the value "fixed"
"""
field_name = f' (On field {field_name})' if field_name else ''
try:
dj_validators.validate_ipv4_address(ipv4)
except Exception:
raise exceptions.ui.ValidationError(_('{} is not a valid IPv4 address').format(ipv4)) from None
raise exceptions.ui.ValidationError(_('{} is not a valid IPv4 address').format(ipv4 + field_name)) from None
return ipv4
def validate_ipv6(ipv6: str) -> str:
def validate_ipv6(ipv6: str, field_name: typing.Optional[str] = None) -> str:
"""
Validates that a ipv6 address is valid
:param ipv6: ipv6 address to validate
:param returnAsInteger: if True, returns value as integer, if not, as string
:return: Raises exceptions.Validation exception if is invalid, else return the value "fixed"
"""
field_name = f' (On field {field_name})' if field_name else ''
try:
dj_validators.validate_ipv6_address(ipv6)
except Exception:
raise exceptions.ui.ValidationError(_('{} is not a valid IPv6 address').format(ipv6)) from None
raise exceptions.ui.ValidationError(_('{} is not a valid IPv6 address').format(ipv6 + field_name)) from None
return ipv6
def validate_ip(ipv4_or_ipv6: str) -> str:
def validate_ip(ipv4_or_ipv6: str, field_name: typing.Optional[str] = None) -> str:
"""
Validates that a ipv4 or ipv6 address is valid
:param ipv4OrIpv6: ipv4 or ipv6 address to validate
:param returnAsInteger: if True, returns value as integer, if not, as string
:return: Raises exceptions.Validation exception if is invalid, else return the value "fixed"
"""
field_name = f' (On field {field_name})' if field_name else ''
try:
dj_validators.validate_ipv46_address(ipv4_or_ipv6)
except Exception:
raise exceptions.ui.ValidationError(
_('{} is not a valid IPv4 or IPv6 address').format(ipv4_or_ipv6)
_('{} is not a valid IPv4 or IPv6 address').format(ipv4_or_ipv6 + field_name)
) from None
return ipv4_or_ipv6
def validate_path(
path: str,
maxLength: int = 1024,
mustBeWindows: bool = False,
mustBeUnix: bool = False,
max_length: int = 1024,
must_be_windows: bool = False,
must_be_unix: bool = False,
field_name: typing.Optional[str] = None,
) -> str:
"""
Validates a path, if not "mustBe" is specified, it will be validated as either windows or unix.
Path must be absolute, and must not exceed maxLength
Args:
path (str): path to validate
maxLength (int, optional): max length of path. Defaults to 1024.
mustBeWindows (bool, optional): if True, path must be a windows path. Defaults to False.
mustBeUnix (bool, optional): if True, path must be a unix path. Defaults to False.
max_length (int, optional): max length of path. Defaults to 1024.
must_be_windows (bool, optional): if True, path must be a windows path. Defaults to False.
must_be_unix (bool, optional): if True, path must be a unix path. Defaults to False.
Raises:
exceptions.ValidationException: if path is not valid
@ -188,26 +194,27 @@ def validate_path(
Returns:
str: path
"""
if len(path) > maxLength:
raise exceptions.ui.ValidationError(_('{} exceeds maximum path length.').format(path))
field_name = f' (On field {field_name})' if field_name else ''
if len(path) > max_length:
raise exceptions.ui.ValidationError(_('{} exceeds maximum path length.').format(path + field_name))
valid_for_windows = re.compile(r'^[a-zA-Z]:\\.*$')
valid_for_unix = re.compile(r'^/.*$')
if mustBeWindows:
if must_be_windows:
if not valid_for_windows.match(path):
raise exceptions.ui.ValidationError(_('{} is not a valid windows path').format(path))
elif mustBeUnix:
raise exceptions.ui.ValidationError(_('{} is not a valid windows path').format(path + field_name))
elif must_be_unix:
if not valid_for_unix.match(path):
raise exceptions.ui.ValidationError(_('{} is not a valid unix path').format(path))
raise exceptions.ui.ValidationError(_('{} is not a valid unix path').format(path + field_name))
else:
if not valid_for_windows.match(path) and not valid_for_unix.match(path):
raise exceptions.ui.ValidationError(_('{} is not a valid path').format(path))
raise exceptions.ui.ValidationError(_('{} is not a valid path').format(path + field_name))
return path
def validate_port(port: typing.Union[str, int]) -> int:
def validate_port(port: typing.Union[str, int], field_name: typing.Optional[str] = None) -> int:
"""
Validates that a port number is valid
@ -220,10 +227,10 @@ def validate_port(port: typing.Union[str, int]) -> int:
Raises:
exceptions.ValidationException: if port is not valid
"""
return validate_numeric(port, min_value=1, max_value=65535, fieldName='Port')
return validate_numeric(port, min_value=1, max_value=65535, field_name=field_name or 'Port')
def validate_host(host: str) -> str:
def validate_host(host: str, field_name: typing.Optional[str] = None) -> str:
"""
Validates that a host is valid
:param host: host to validate
@ -233,16 +240,17 @@ def validate_host(host: str) -> str:
dj_validators.validate_ipv46_address(host)
return host
except Exception:
return validate_fqdn(host)
return validate_fqdn(host, field_name=field_name)
def validate_host_port(host_port_pair: str) -> tuple[str, int]:
def validate_host_port(host_port_pair: str, field_name: typing.Optional[str] = None) -> tuple[str, int]:
"""
Validates that a host:port pair is valid
:param hostPortPair: host:port pair to validate
:param returnAsInteger: if True, returns value as integer, if not, as string
:return: Raises exceptions.Validation exception if is invalid, else return the value "fixed"
"""
field_name = f' (On field {field_name})' if field_name else ''
try:
if '[' in host_port_pair and ']' in host_port_pair: # IPv6
host, port = host_port_pair.split(']:')
@ -256,26 +264,27 @@ def validate_host_port(host_port_pair: str) -> tuple[str, int]:
except Exception:
return validate_hostname(host, 255, False), validate_port(port)
except Exception:
raise exceptions.ui.ValidationError(_('{} is not a valid host:port pair').format(host_port_pair)) from None
raise exceptions.ui.ValidationError(_('{} is not a valid host:port pair').format(host_port_pair + field_name)) from None
def validate_timeout(timeOutStr: str) -> int:
def validate_timeout(timeout: 'str|int', field_name: typing.Optional[str] = None) -> int:
"""
Validates that a timeout value is valid
:param timeOutStr: timeout to validate
:param returnAsInteger: if True, returns value as integer, if not, as string
:return: Raises exceptions.Validation exception if is invalid, else return the value "fixed"
"""
return validate_numeric(timeOutStr, min_value=0, fieldName='Timeout')
return validate_numeric(timeout, min_value=0, field_name=field_name or 'Timeout')
def validate_mac(mac: str) -> str:
def validate_mac(mac: str, field_name: typing.Optional[str] = None) -> str:
"""
Validates that a mac address is valid
:param mac: mac address to validate
:return: Raises exceptions.Validation exception if is invalid, else return the value "fixed"
"""
# Removes white spaces and all to uppercase
field_name = f' (On field {field_name})' if field_name else ''
mac = mac.upper().replace(' ', '')
macRE = re.compile(
@ -283,23 +292,24 @@ def validate_mac(mac: str) -> str:
) # In fact, it could be XX-XX-XX-XX-XX-XX, but we use - as range separator
if macRE.match(mac) is None:
raise exceptions.ui.ValidationError(_('{} is not a valid MAC address').format(mac))
raise exceptions.ui.ValidationError(_('{} is not a valid MAC address').format(mac + field_name))
return mac
def validate_mac_range(macRange: str) -> str:
def validate_mac_range(macRange: str, field_name: typing.Optional[str] = None) -> str:
"""
Corrects mac range (uppercase, without spaces), and checks that is range is valid
:param macRange: Range to fix
:return: Raises exceptions.Validation exception if is invalid, else return the value "fixed"
"""
field_name = f' (On field {field_name})' if field_name else ''
try:
macRangeStart, macRangeEnd = macRange.split('-')
validate_mac(macRangeStart)
validate_mac(macRangeEnd)
except Exception:
raise exceptions.ui.ValidationError(_('{} is not a valid MAC range').format(macRange)) from None
raise exceptions.ui.ValidationError(_('{} is not a valid MAC range').format(macRange + field_name)) from None
return macRange
@ -348,7 +358,7 @@ def validate_basename(baseName: str, length: int = -1) -> str:
return baseName
def validate_json(jsonData: typing.Optional[str]) -> typing.Any:
def validate_json(json_data: typing.Optional[str]) -> typing.Any:
"""
Validates that a json data is valid (or empty)
@ -361,10 +371,10 @@ def validate_json(jsonData: typing.Optional[str]) -> typing.Any:
Returns:
typing.Any: Json data as python object
"""
if not jsonData:
if not json_data:
return None
try:
return json.loads(jsonData)
return json.loads(json_data)
except Exception:
raise exceptions.ui.ValidationError(_('Invalid JSON data')) from None

View File

@ -80,11 +80,11 @@ class ManagedObjectModel(UUIDModel):
if not values and self.data:
obj.deserialize(self.data)
if obj.needs_remarshalling():
if obj.needs_upgrade():
# Re-serialize to db
self.data = obj.serialize()
self.save(update_fields=['data'])
obj.flag_for_remarshalling(False)
obj.flag_for_upgrade(False)
self._cached_instance = None # Ensures returns correct value on get_instance