1
0
mirror of https://github.com/dkmstr/openuds.git synced 2025-02-02 09:47:13 +03:00

Added tests for validate initialization by ip also

This commit is contained in:
Adolfo Gómez García 2024-09-10 09:57:05 +02:00
parent 4c755f548e
commit ab1e807f36
No known key found for this signature in database
GPG Key ID: DD1ABF20724CDA23
2 changed files with 192 additions and 39 deletions

View File

@ -36,6 +36,7 @@ from uds import models
from uds.core.consts.system import VERSION
from ...utils import rest
from ...fixtures import services as services_fixtures
logger = logging.getLogger(__name__)
@ -50,7 +51,9 @@ class ActorInitializeTest(rest.test.RESTActorTestCase):
self,
type_: typing.Union[typing.Literal['managed'], typing.Literal['unmanaged']],
token: str,
mac: str,
*,
mac: typing.Optional[str] = None,
ip: typing.Optional[str] = None,
) -> dict[str, typing.Any]:
response = self.client.post(
'/uds/rest/actor/v3/initialize',
@ -58,11 +61,11 @@ class ActorInitializeTest(rest.test.RESTActorTestCase):
'type': type_,
'version': VERSION,
'token': token,
'id': [{'mac': mac, 'ip': '1.2.3.4'}],
'id': [{'mac': mac or '42:AC:11:22:33', 'ip': ip or '1.2.3.4'}],
},
content_type='application/json',
)
self.assertEqual(response.status_code, 200)
self.assertEqual(response.status_code, 200)
data = response.json()
self.assertIsInstance(data['result'], dict)
return data['result']
@ -71,8 +74,10 @@ class ActorInitializeTest(rest.test.RESTActorTestCase):
self,
type_: typing.Union[typing.Literal['managed'], typing.Literal['unmanaged']],
token: str,
mac: str,
expectForbbiden: bool,
*,
mac: typing.Optional[str] = None,
ip: typing.Optional[str] = None,
expect_forbidden: bool = False,
) -> dict[str, typing.Any]:
response = self.client.post(
'/uds/rest/actor/v3/initialize',
@ -80,19 +85,19 @@ class ActorInitializeTest(rest.test.RESTActorTestCase):
'type': type_,
'version': VERSION,
'token': token,
'id': [{'mac': mac, 'ip': '4.3.2.1'}],
'id': [{'mac': mac or '42:AC:33:22:11', 'ip': ip or '4.3.2.1'}],
},
content_type='application/json',
)
self.assertEqual(response.status_code, 200 if not expectForbbiden else 403)
if expectForbbiden:
self.assertEqual(response.status_code, 200 if not expect_forbidden else 403)
if expect_forbidden:
return {}
data = response.json()
self.assertIsInstance(data['result'], dict)
return data['result']
def test_initialize_managed(self) -> None:
def test_initialize_managed_by_mac(self) -> None:
"""
Test actor initialize v3 for managed actor
"""
@ -100,13 +105,13 @@ class ActorInitializeTest(rest.test.RESTActorTestCase):
actor_token = self.login_and_register()
# Get the user service unique_id
unique_id = self.user_service_managed.get_unique_id()
# Get the user service unique_id, the default
unique_id = user_service.get_unique_id()
success = functools.partial(self.invoke_success, 'managed', actor_token)
failure = functools.partial(self.invoke_failure, 'managed')
success = functools.partial(self.invoke_success, 'managed', actor_token, ip='1.2.3.4')
failure = functools.partial(self.invoke_failure, 'managed', ip='1.2.3.4')
result = success(unique_id)
result = success(mac=unique_id)
# Ensure own token is assigned
self.assertEqual(result['token'], user_service.uuid)
@ -125,28 +130,80 @@ class ActorInitializeTest(rest.test.RESTActorTestCase):
self.assertEqual(os['name'], user_service.friendly_name)
# Now invoke failure
failure('invalid token', unique_id, True)
failure('invalid token', mac=unique_id, expect_forbidden=True)
# Now invoke failure with valid token but invalid mac
result = failure(actor_token, 'invalid mac', False)
result = failure(actor_token, mac='invalid mac', expect_forbidden=False)
self.assertIsNone(result['own_token'])
self.assertIsNone(result['token'])
self.assertIsNone(result['os'])
self.assertIsNone(result['unique_id'])
def test_initialize_unmanaged(self) -> None:
def test_initialize_managed_by_ip(self) -> None:
"""
Test actor initialize v3 for managed actor, same as previous but using ip instead of mac
"""
user_service = services_fixtures.create_db_one_assigned_userservice(
self.provider,
self.admins[0],
self.groups,
'managed',
)
# Set an IP as unique_id
unique_id = '1.2.3.4'
user_service.unique_id = unique_id
user_service.save()
actor_token = self.login_and_register()
success = functools.partial(self.invoke_success, 'managed', actor_token, mac='42:AC:99:99:99')
failure = functools.partial(self.invoke_failure, 'managed', mac='42:AC:99:99:99')
result = success(ip=unique_id)
# Ensure own token is assigned
self.assertEqual(result['token'], user_service.uuid)
self.assertEqual(result['own_token'], result['token']) # Compat value with 3.x actors
# Ensure unique_id detected is ours
self.assertEqual(result['unique_id'], unique_id)
# Ensure os is set and it is a dict
self.assertIsInstance(result['os'], dict)
os = result['os']
# Ensure requested action is rename
self.assertEqual(os['action'], 'rename')
# And name is userservice name
self.assertEqual(os['name'], user_service.friendly_name)
# Now invoke failure
failure('invalid token', ip=unique_id, expect_forbidden=True)
# Now invoke failure with valid token but invalid ip
result = failure(actor_token, ip='invalid ip', expect_forbidden=False)
self.assertIsNone(result['own_token'])
self.assertIsNone(result['token'])
self.assertIsNone(result['os'])
self.assertIsNone(result['unique_id'])
def test_initialize_unmanaged_by_mac(self) -> None:
"""
Test actor initialize v3 for unmanaged actor
"""
user_service = self.user_service_unmanaged
actor_token: str = (user_service.deployed_service.service.token if user_service.deployed_service.service else None) or ''
actor_token: str = (
user_service.deployed_service.service.token if user_service.deployed_service.service else None
) or ''
unique_id = user_service.get_unique_id()
success = functools.partial(self.invoke_success, 'unmanaged')
failure = functools.partial(self.invoke_failure, 'unmanaged')
TEST_MAC: typing.Final[str] = '00:00:00:00:00:00'
# This will succeed, but only alias token is returned because MAC is not registered by UDS
@ -160,11 +217,10 @@ class ActorInitializeTest(rest.test.RESTActorTestCase):
self.assertEqual(result['token'], result['own_token'])
self.assertIsNone(result['unique_id'])
self.assertIsNone(result['os'])
# Store alias token for later tests
alias_token = result['token']
# If repeated, same token is returned
result = success(
actor_token,
@ -198,4 +254,78 @@ class ActorInitializeTest(rest.test.RESTActorTestCase):
self.assertEqual(result['unique_id'], unique_id)
#
failure('invalid token', unique_id, True)
failure('invalid token', mac=unique_id, expect_forbidden=True)
def test_initialize_unmanaged_by_ip(self) -> None:
"""
Test actor initialize v3 for unmanaged actor
"""
user_service = services_fixtures.create_db_one_assigned_userservice(
self.provider,
self.admins[0],
self.groups,
'unmanaged',
)
# Set an IP as unique_id
unique_id = '1.2.3.4'
user_service.unique_id = unique_id
user_service.save()
actor_token: str = (
user_service.deployed_service.service.token if user_service.deployed_service.service else None
) or ''
success = functools.partial(self.invoke_success, 'unmanaged', mac='00:00:00:00:00:00')
failure = functools.partial(self.invoke_failure, 'unmanaged', mac='00:00:00:00:00:00')
TEST_IP: typing.Final[str] = '00:00:00:00:00:00'
# This will succeed, but only alias token is returned because MAC is not registered by UDS
result = success(
actor_token,
ip=TEST_IP,
)
# Unmanaged host is the response for initialization of unmanaged actor ALWAYS
self.assertIsInstance(result['token'], str)
self.assertEqual(result['token'], result['own_token'])
self.assertIsNone(result['unique_id'])
self.assertIsNone(result['os'])
# Store alias token for later tests
alias_token = result['token']
# If repeated, same token is returned
result = success(
actor_token,
ip=TEST_IP,
)
self.assertEqual(result['token'], alias_token)
# Now, invoke a "nice" initialize
result = success(
actor_token,
ip=unique_id,
)
token = result['token']
self.assertIsInstance(token, str)
self.assertEqual(token, user_service.uuid)
self.assertEqual(token, result['own_token'])
self.assertEqual(result['unique_id'], unique_id)
# Ensure that the alias returned is on alias db, and it points to the same service as the one we belong to
alias = models.ServiceTokenAlias.objects.get(alias=alias_token)
self.assertEqual(alias.service, user_service.deployed_service.service)
# Now, we should be able to "initialize" with valid mac and with original and alias tokens
# If we call initialize and we get "own-token" means that we have already logged in with this data
result = success(alias_token, ip=unique_id)
self.assertEqual(result['token'], user_service.uuid)
self.assertEqual(result['token'], result['own_token'])
self.assertEqual(result['unique_id'], unique_id)
#
failure('invalid token', ip=unique_id, expect_forbidden=True)

View File

@ -86,7 +86,9 @@ def validate_numeric(
return int(value)
def validate_hostname(hostname: str, max_length: int = 64, domain_allowed: bool=False, field_name: typing.Optional[str] = None) -> str:
def validate_hostname(
hostname: str, max_length: int = 64, domain_allowed: bool = False, field_name: typing.Optional[str] = None
) -> str:
field_name = f' (On field {field_name})' if field_name else ''
if len(hostname) > max_length:
raise exceptions.ui.ValidationError(
@ -102,7 +104,9 @@ def validate_hostname(hostname: str, max_length: int = 64, domain_allowed: bool=
allowed = re.compile(r'(?!-)[A-Z\d-]{1,63}(?<!-)$', re.IGNORECASE)
if not all(allowed.match(x) for x in hostname.split(".")):
raise exceptions.ui.ValidationError(_('{} is not a valid hostname: (invalid characters)').format(hostname + field_name))
raise exceptions.ui.ValidationError(
_('{} is not a valid hostname: (invalid characters)').format(hostname + field_name)
)
return hostname
@ -114,7 +118,9 @@ def validate_fqdn(fqdn: str, maxLength: int = 255, field_name: typing.Optional[s
def validateUrl(url: str, maxLength: int = 1024, field_name: typing.Optional[str] = None) -> str:
field_name = f' (On field {field_name})' if field_name else ''
if len(url) > maxLength:
raise exceptions.ui.ValidationError(_('{} is not a valid URL: exceeds maximum length.').format(url + field_name))
raise exceptions.ui.ValidationError(
_('{} is not a valid URL: exceeds maximum length.').format(url + field_name)
)
try:
url_validator(url)
@ -135,7 +141,9 @@ def validate_ipv4(ipv4: str, field_name: typing.Optional[str] = None) -> str:
try:
dj_validators.validate_ipv4_address(ipv4)
except Exception:
raise exceptions.ui.ValidationError(_('{} is not a valid IPv4 address').format(ipv4 + field_name)) from None
raise exceptions.ui.ValidationError(
_('{} is not a valid IPv4 address').format(ipv4 + field_name)
) from None
return ipv4
@ -150,7 +158,9 @@ def validate_ipv6(ipv6: str, field_name: typing.Optional[str] = None) -> str:
try:
dj_validators.validate_ipv6_address(ipv6)
except Exception:
raise exceptions.ui.ValidationError(_('{} is not a valid IPv6 address').format(ipv6 + field_name)) from None
raise exceptions.ui.ValidationError(
_('{} is not a valid IPv6 address').format(ipv6 + field_name)
) from None
return ipv6
@ -213,7 +223,12 @@ def validate_path(
return path
def validate_port(port: typing.Union[str, int], *, field_name: typing.Optional[str] = None, valid_default: typing.Optional[int] = None) -> int:
def validate_port(
port: typing.Union[str, int],
*,
field_name: typing.Optional[str] = None,
valid_default: typing.Optional[int] = None,
) -> int:
"""
Validates that a port number is valid
@ -265,7 +280,9 @@ def validate_host_port(host_port_pair: str, field_name: typing.Optional[str] = N
except Exception:
return validate_hostname(host, 255, True), validate_port(port)
except Exception:
raise exceptions.ui.ValidationError(_('{} is not a valid host:port pair').format(host_port_pair + field_name)) from None
raise exceptions.ui.ValidationError(
_('{} is not a valid host:port pair').format(host_port_pair + field_name)
) from None
def validate_timeout(timeout: 'str|int', field_name: typing.Optional[str] = None) -> int:
@ -310,7 +327,9 @@ def validate_mac_range(macRange: str, field_name: typing.Optional[str] = None) -
validate_mac(macRangeStart)
validate_mac(macRangeEnd)
except Exception:
raise exceptions.ui.ValidationError(_('{} is not a valid MAC range').format(macRange + field_name)) from None
raise exceptions.ui.ValidationError(
_('{} is not a valid MAC range').format(macRange + field_name)
) from None
return macRange
@ -399,6 +418,7 @@ def validate_certificate(cert: typing.Optional[str]) -> str:
raise exceptions.ui.ValidationError(_('Invalid certificate'))
return cert
def validate_private_key(key: typing.Optional[str]) -> str:
"""
Validates that a private key is valid
@ -419,22 +439,25 @@ def validate_private_key(key: typing.Optional[str]) -> str:
return key
def split_with_separator(text: str, separator: str) -> list[str]:
parts = text.split(separator)
# Reconstruct the list with the separator included
result = [part + separator for part in parts[:-1]] + [parts[-1]]
return result
def validate_server_certificate_multiple(value: typing.Optional[str]) -> str:
"""
Validates the multi line fields refering to attributes
"""
if not value:
return '' # Ok, empty
raise exceptions.ui.ValidationError(_('Certificate is empty'))
pemCerts = value.split('-----END CERTIFICATE-----')
# Remove empty strings
pemCerts = [cert for cert in pemCerts if cert.strip() != '']
# Add back the "-----END CERTIFICATE-----" part
pemCerts = [cert + '-----END CERTIFICATE-----' for cert in pemCerts]
pem_certs = [cert for cert in split_with_separator(value, '-----END CERTIFICATE-----') if cert.strip()]
for pemCert in pemCerts:
for pem_cert in pem_certs:
try:
load_pem_x509_certificate(pemCert.encode())
load_pem_x509_certificate(pem_cert.encode())
except Exception as e:
raise exceptions.ui.ValidationError(_('Invalid certificate') + f' :{e}') from e