1
0
mirror of https://github.com/dkmstr/openuds.git synced 2025-02-02 09:47:13 +03:00

Working on openstack tests for new model

This commit is contained in:
Adolfo Gómez García 2024-06-16 18:59:59 +02:00
parent fe91ecbad6
commit 98ff066fe7
No known key found for this signature in database
GPG Key ID: DD1ABF20724CDA23
8 changed files with 239 additions and 185 deletions

View File

@ -48,7 +48,8 @@ class EnsureTest(UDSTestCase):
self.assertEqual(ensure.as_list(1), [1])
self.assertEqual(ensure.as_list('111'), ['111'])
self.assertEqual(ensure.as_list(None), [])
self.assertEqual(ensure.as_list({}), [])
self.assertEqual(ensure.as_list(set()), [])
self.assertEqual(ensure.as_list({}), [{}]) # Dict into list of dict
self.assertEqual(ensure.as_list({1, 2, 3}), [1, 2, 3])

View File

@ -41,7 +41,8 @@ from unittest import mock
from uds.core import environment, types
from uds.core.ui.user_interface import gui
from ...utils.autospec import autospec, AutoSpecMethodInfo
from tests.utils.autospec import autospec, AutoSpecMethodInfo
from tests.utils import search_item_by_attr
from uds.services.OpenStack import (
provider,
@ -230,17 +231,20 @@ CONSOLE_CONNECTION_INFO: types.services.ConsoleConnectionInfo = types.services.C
T = typing.TypeVar('T')
def get_id(iterable: typing.Iterable[T], id: str) -> T:
try:
return next(filter(lambda x: x.id == id, iterable)) # type: ignore
except StopIteration:
raise ValueError(f'Id {id} not found in iterable') from None
def set_all_vms_status(status: openstack_types.ServerStatus) -> None:
for vm in SERVERS_LIST:
vm.status = status
def search_id(lst: list[T], id: str, *args: typing.Any, **kwargs: typing.Any) -> T:
return search_item_by_attr(lst, 'id', id)
def set_vm_state(id: str, state: openstack_types.PowerState, **kwargs: typing.Any) -> str:
vm = search_id(SERVERS_LIST, id)
vm.power_state = state
return str(state) + '_task_uuid'
def random_element(lst: list[T], *args: typing.Any, **kwargs: typing.Any) -> T:
return random.choice(lst)
# Methods that returns None or "internal" methods are not tested
# The idea behind this is to allow testing the provider, service and deployment classes
@ -266,37 +270,38 @@ CLIENT_METHODS_INFO: typing.Final[list[AutoSpecMethodInfo]] = [
),
AutoSpecMethodInfo(
openstack_client.OpenstackClient.get_server,
returns=lambda server_id: get_id(SERVERS_LIST, server_id), # pyright: ignore
), # pyright: ignore
returns=search_id,
partial_args=(SERVERS_LIST,),
),
AutoSpecMethodInfo(
openstack_client.OpenstackClient.get_volume,
returns=lambda volume_id: get_id(VOLUMES_LIST, volume_id), # pyright: ignore
returns=search_id,
partial_args=(VOLUMES_LIST,),
), # pyright: ignore
AutoSpecMethodInfo(
openstack_client.OpenstackClient.get_volume_snapshot,
returns=lambda snapshot_id: get_id(VOLUME_SNAPSHOTS_LIST, snapshot_id), # pyright: ignore
returns=search_id,
partial_args=(VOLUME_SNAPSHOTS_LIST,),
), # pyright: ignore
AutoSpecMethodInfo(
openstack_client.OpenstackClient.update_snapshot,
returns=lambda snapshot_id, name, description: get_id( # pyright: ignore
VOLUME_SNAPSHOTS_LIST, snapshot_id # pyright: ignore
),
returns=search_id,
partial_args=(VOLUME_SNAPSHOTS_LIST,),
),
AutoSpecMethodInfo(
openstack_client.OpenstackClient.create_volume_snapshot,
returns=lambda volume_id, name, description: random.choice( # pyright: ignore
VOLUME_SNAPSHOTS_LIST,
),
returns=random_element,
partial_args=(VOLUME_SNAPSHOTS_LIST,),
),
AutoSpecMethodInfo(
openstack_client.OpenstackClient.create_volume_from_snapshot,
returns=lambda snapshot_id, name, description: get_id( # pyright: ignore
VOLUMES_LIST, f'vid{len(VOLUMES_LIST) + 1}'
),
returns=random_element,
partial_args=(VOLUMES_LIST,),
),
AutoSpecMethodInfo(
openstack_client.OpenstackClient.create_server_from_snapshot,
returns=lambda *args, **kwargs: random.choice(SERVERS_LIST), # pyright: ignore
returns=random_element,
partial_args=(SERVERS_LIST,),
),
AutoSpecMethodInfo(
openstack_client.OpenstackClient.test_connection,

View File

@ -33,9 +33,11 @@ Author: Adolfo Gómez, dkmaster at dkmon dot com
import pickle
import typing
from uds.core.environment import Environment
from uds.core import types as core_types
# We use storage, so we need transactional tests
from tests.utils.test import UDSTransactionTestCase
from uds.core.environment import Environment
from uds.services.OpenStack import deployment as deployment
@ -51,24 +53,25 @@ from uds.services.OpenStack import deployment as deployment
# self._vmid = vals[4].decode('utf8')
# self._reason = vals[5].decode('utf8')
# self._queue = pickle.loads(vals[6]) # nosec: not insecure, we are loading our own data
EXPECTED_FIELDS: typing.Final[set[str]] = {
EXPECTED_OWN_FIELDS: typing.Final[set[str]] = {
'_name',
'_ip',
'_mac',
'_vmid',
'_reason',
'_queue',
'_check_count'
}
TEST_QUEUE: typing.Final[list[deployment.OldOperation]] = [
OLD_TEST_QUEUE: typing.Final[list[deployment.OldOperation]] = [
deployment.OldOperation.CREATE,
deployment.OldOperation.REMOVE,
deployment.OldOperation.RETRY,
]
TEST_QUEUE = [i.to_operation() for i in OLD_TEST_QUEUE]
SERIALIZED_DEPLOYMENT_DATA: typing.Final[typing.Mapping[str, bytes]] = {
'v1': b'v1\x01name\x01ip\x01mac\x01vmid\x01reason\x01' + pickle.dumps(TEST_QUEUE, protocol=0),
'v1': b'v1\x01name\x01ip\x01mac\x01vmid\x01reason\x01' + pickle.dumps(OLD_TEST_QUEUE, protocol=0),
}
LAST_VERSION: typing.Final[str] = sorted(SERIALIZED_DEPLOYMENT_DATA.keys(), reverse=True)[0]
@ -115,8 +118,6 @@ class OpenStackDeploymentSerializationTest(UDSTransactionTestCase):
def test_marshaling_queue(self) -> None:
# queue is kept on "storage", so we need always same environment
environment = Environment.testing_environment()
# Store queue
environment.storage.save_pickled('queue', TEST_QUEUE)
def _create_instance(unmarshal_data: 'bytes|None' = None) -> deployment.OpenStackLiveUserService:
instance = deployment.OpenStackLiveUserService(environment=environment, service=None) # type: ignore
@ -125,29 +126,29 @@ class OpenStackDeploymentSerializationTest(UDSTransactionTestCase):
return instance
instance = _create_instance(SERIALIZED_DEPLOYMENT_DATA[LAST_VERSION])
self.assertEqual(instance._queue, TEST_QUEUE)
self.assertEqual(instance._queue, TEST_QUEUE) # Always unmarshalled as new format
instance._queue = [
deployment.OldOperation.CREATE,
deployment.OldOperation.FINISH,
core_types.services.Operation.CREATE,
core_types.services.Operation.FINISH,
]
marshaled_data = instance.marshal()
instance = _create_instance(marshaled_data)
self.assertEqual(
instance._queue,
[deployment.OldOperation.CREATE, deployment.OldOperation.FINISH],
[core_types.services.Operation.CREATE, core_types.services.Operation.FINISH],
)
# Append something remarshall and check
instance._queue.insert(0, deployment.OldOperation.RETRY)
instance._queue.insert(0, core_types.services.Operation.RETRY)
marshaled_data = instance.marshal()
instance = _create_instance(marshaled_data)
self.assertEqual(
instance._queue,
[
deployment.OldOperation.RETRY,
deployment.OldOperation.CREATE,
deployment.OldOperation.FINISH,
core_types.services.Operation.RETRY,
core_types.services.Operation.CREATE,
core_types.services.Operation.FINISH,
],
)
# Remove something remarshall and check
@ -156,7 +157,7 @@ class OpenStackDeploymentSerializationTest(UDSTransactionTestCase):
instance = _create_instance(marshaled_data)
self.assertEqual(
instance._queue,
[deployment.OldOperation.CREATE, deployment.OldOperation.FINISH],
[core_types.services.Operation.CREATE, core_types.services.Operation.FINISH],
)
def test_autoserialization_fields(self) -> None:
@ -165,4 +166,8 @@ class OpenStackDeploymentSerializationTest(UDSTransactionTestCase):
with Environment.temporary_environment() as env:
instance = deployment.OpenStackLiveUserService(environment=env, service=None) # type: ignore
self.assertSetEqual(set(f[0] for f in instance._autoserializable_fields()), EXPECTED_FIELDS)
self.assertTrue(
EXPECTED_OWN_FIELDS <= set(f[0] for f in instance._autoserializable_fields()),
'Missing fields: '
+ str(EXPECTED_OWN_FIELDS - set(f[0] for f in instance._autoserializable_fields())),
)

View File

@ -74,38 +74,38 @@ class TestOpenstackService(UDSTransactionTestCase):
network_id=service.network.value,
security_groups_ids=service.security_groups.value,
)
data = service.get_machine_status(fixtures.SERVERS_LIST[0].id)
data = service.api.get_server(fixtures.SERVERS_LIST[0].id).status
self.assertIsInstance(data, openstack_types.ServerStatus)
api.get_server.assert_called_once_with(fixtures.SERVERS_LIST[0].id)
# Reset mocks, get server should be called again
api.reset_mock()
data = service.get_machine_power_state(fixtures.SERVERS_LIST[0].id)
data = service.api.get_server(fixtures.SERVERS_LIST[0].id).power_state
self.assertIsInstance(data, openstack_types.PowerState)
api.get_server.assert_called_once_with(fixtures.SERVERS_LIST[0].id)
server = fixtures.SERVERS_LIST[0]
service.start_machine(server.id)
service.api.start_server(server.id)
server.power_state = openstack_types.PowerState.SHUTDOWN
api.start_server.assert_called_once_with(server.id)
server.power_state = openstack_types.PowerState.RUNNING
service.stop_machine(server.id)
service.api.stop_server(server.id)
api.stop_server.assert_called_once_with(server.id)
server.power_state = openstack_types.PowerState.RUNNING
service.suspend_machine(server.id)
service.api.suspend_server(server.id)
api.suspend_server.assert_called_once_with(server.id)
server.power_state = openstack_types.PowerState.SUSPENDED
service.resume_machine(server.id)
service.api.resume_server(server.id)
api.resume_server.assert_called_once_with(server.id)
service.reset_machine(server.id)
service.api.reset_server(server.id)
api.reset_server.assert_called_once_with(server.id)
service.delete_machine(server.id)
service.api.delete_server(server.id)
api.delete_server.assert_called_once_with(server.id)
self.assertTrue(service.is_avaliable())
@ -114,4 +114,4 @@ class TestOpenstackService(UDSTransactionTestCase):
self.assertEqual(service.get_basename(), service.basename.value)
self.assertEqual(service.get_lenname(), service.lenname.value)
self.assertEqual(service.allows_errored_userservice_cleanup(), not service.maintain_on_error.value)
self.assertEqual(service.keep_on_error(), service.maintain_on_error.value)
self.assertEqual(service.should_maintain_on_error(), service.maintain_on_error.value)

View File

@ -31,182 +31,225 @@
Author: Adolfo Gómez, dkmaster at dkmon dot com
"""
import typing
import contextlib
from uds import models
from uds.core import types
from unittest import mock
from uds.services.OpenStack.openstack import types as openstack_types
from uds.services.OpenStack.deployment import OldOperation
from . import fixtures
from tests.utils import MustBeOfType
from ...utils.test import UDSTransactionTestCase
from ...utils.helpers import limited_iterator
if typing.TYPE_CHECKING:
from uds.services.OpenStack.deployment import OpenStackLiveUserService
# We use transactions on some related methods (storage access, etc...)
class TestOpenstackLiveDeployment(UDSTransactionTestCase):
def setUp(self) -> None:
pass
_old_servers: typing.List['openstack_types.ServerInfo']
# Openstack only have l1 cache. L2 is not considered useful right now
def test_userservice_cachel1_and_user(self) -> None:
def setUp(self) -> None:
# Sets all vms to running, later restore original values
self._old_servers = fixtures.SERVERS_LIST.copy()
for vm in fixtures.SERVERS_LIST:
vm.power_state = fixtures.openstack_types.PowerState.RUNNING
def tearDown(self) -> None:
fixtures.SERVERS_LIST = self._old_servers
@contextlib.contextmanager
def setup_data(self) -> typing.Iterator[
tuple[
'OpenStackLiveUserService',
mock.MagicMock,
]
]:
with fixtures.patched_provider() as provider:
api = typing.cast(mock.MagicMock, provider.api)
service = fixtures.create_live_service(provider=provider)
userservice = fixtures.create_live_userservice(service=service)
yield userservice, api
def assert_basic_calls(self, userservice: 'OpenStackLiveUserService', api: mock.MagicMock) -> None:
return
service = userservice.service()
vmid = userservice._vmid
# These are the calls that should have been done (some more, like get_task, etc.., but we are not interested in them)
api.locate_vms.assert_called_with(
userservice.get_vmname(), service.project.value
) # look for duplicate name
api.clone_vm.assert_called_with(userservice.publication().get_template_id()) # clone the template
api.update_vm.assert_called_with(
vmid,
name=userservice._name,
description=MustBeOfType(str),
vcpus=service.num_vcpus.value,
vcores=service.cores_per_vcpu.value,
enable_cpu_passthrough=True,
memory_mb=service.memory.value,
) # update the machine with the required values
api.get_vm_info.assert_called_with(vmid) # for start the machine
# Start mayby or mayby not called, if machine is already running, it will not be called
# Because we setup our fixtures to have all machines running, this will not be called
def test_userservice_linked_cache_l1(self) -> None:
"""
Test the user service
"""
# Deploy for cache and deploy for user are the same, so we will test both at the same time
for to_test in ['cache', 'user']:
for patcher in (fixtures.patched_provider, fixtures.patched_provider_legacy):
with patcher() as prov:
api = typing.cast(mock.MagicMock, prov.api())
service = fixtures.create_live_service(prov)
userservice = fixtures.create_live_userservice(service=service)
publication = userservice.publication()
publication._template_id = 'snap1'
with self.setup_data() as (userservice, api):
service = userservice.service()
state = userservice.deploy_for_cache(level=types.services.CacheLevel.L1)
self.assertEqual(state, types.states.TaskState.RUNNING)
if to_test == 'cache':
state = userservice.deploy_for_cache(level=types.services.CacheLevel.L1)
else:
state = userservice.deploy_for_user(models.User())
# Ensure that in the event of failure, we don't loop forever
for _ in limited_iterator(lambda: state == types.states.TaskState.RUNNING, limit=128):
state = userservice.check_state()
self.assertEqual(state, types.states.TaskState.RUNNING, f'Error on {to_test} deployment')
self.assertEqual(state, types.states.TaskState.FINISHED, userservice._error_debug_info)
# Create server should have been called
api.create_server_from_snapshot.assert_called_with(
snapshot_id='snap1',
name=userservice._name,
availability_zone=service.availability_zone.value,
flavor_id=service.flavor.value,
network_id=service.network.value,
security_groups_ids=service.security_groups.value,
)
self.assertEqual(userservice._name[: len(service.get_basename())], service.get_basename())
self.assertEqual(len(userservice._name), len(service.get_basename()) + service.get_lenname())
vmid = userservice._vmid
self.assert_basic_calls(userservice, api)
# Set power state of machine to running (userservice._vmid)
fixtures.get_id(fixtures.SERVERS_LIST, vmid).power_state = (
openstack_types.PowerState.RUNNING
)
def test_userservice_linked_cache_l2(self) -> None:
"""
Test the user service
"""
with self.setup_data() as (userservice, api):
service = userservice.service()
state = userservice.deploy_for_cache(level=types.services.CacheLevel.L2)
self.assertEqual(state, types.states.TaskState.RUNNING)
# Ensure that in the event of failure, we don't loop forever
for _ in limited_iterator(lambda: state == types.states.TaskState.RUNNING, limit=128):
state = userservice.check_state()
for _ in limited_iterator(lambda: state == types.states.TaskState.RUNNING, limit=128):
state = userservice.check_state()
self.assertEqual(state, types.states.TaskState.FINISHED, f'Error on {to_test} deployment')
# If first item in queue is WAIT, we must "simulate" the wake up from os manager
if userservice._queue[0] == types.services.Operation.WAIT:
userservice.process_ready_from_os_manager(None)
# userservice name is UDS-U-
self.assertEqual(
userservice._name[6: 6+len(service.get_basename())],
service.get_basename(),
f'Error on {to_test} deployment',
)
self.assertEqual(
len(userservice._name),
len(service.get_basename()) + service.get_lenname() + 6, # for UDS-U- prefix
f'Error on {to_test} deployment',
)
self.assertEqual(state, types.states.TaskState.FINISHED)
# Get server should have been called at least once
api.get_server.assert_called_with(vmid)
self.assertEqual(userservice._name[: len(service.get_basename())], service.get_basename())
self.assertEqual(len(userservice._name), len(service.get_basename()) + service.get_lenname())
# Mac an ip should have been set
self.assertNotEqual(userservice._mac, '', f'Error on {to_test} deployment')
self.assertNotEqual(userservice._ip, '', f'Error on {to_test} deployment')
vmid = userservice._vmid
self.assert_basic_calls(userservice, api)
# And stop the machine
api.shutdown_vm.assert_called_with(vmid)
# And queue must be finished
self.assertEqual(userservice._queue, [OldOperation.FINISH], f'Error on {to_test} deployment')
def test_userservice_linked_user(self) -> None:
"""
Test the user service
"""
with self.setup_data() as (userservice, api):
service = userservice.service()
state = userservice.deploy_for_user(models.User())
self.assertEqual(state, types.states.TaskState.RUNNING)
for _ in limited_iterator(lambda: state == types.states.TaskState.RUNNING, limit=128):
state = userservice.check_state()
self.assertEqual(
state,
types.states.TaskState.FINISHED,
f'Queue: {userservice._queue}, reason: {userservice._reason}, extra_info: {userservice._error_debug_info}',
)
self.assertEqual(userservice._name[: len(service.get_basename())], service.get_basename())
self.assertEqual(len(userservice._name), len(service.get_basename()) + service.get_lenname())
self.assert_basic_calls(userservice, api)
# Set ready state with the valid machine
state = userservice.set_ready()
# Machine is already running, must return FINISH state
# As long as the machine is not started, START, START_COMPLETED are not added to the queue
self.assertEqual(state, types.states.TaskState.FINISHED)
for _ in limited_iterator(lambda: state == types.states.TaskState.RUNNING, limit=32):
state = userservice.check_state()
# Should be finished now
self.assertEqual(state, types.states.TaskState.FINISHED)
def test_userservice_cancel(self) -> None:
"""
Test the user service
"""
for patcher in (fixtures.patched_provider, fixtures.patched_provider_legacy):
with patcher() as prov:
service = fixtures.create_live_service(prov)
userservice = fixtures.create_live_userservice(service=service)
publication = userservice.publication()
publication._template_id = 'snap1'
state = userservice.deploy_for_user(models.User())
for graceful in [True, False]:
with self.setup_data() as (userservice, api):
service = userservice.service()
service.try_soft_shutdown.value = graceful
state = userservice.deploy_for_user(mock.MagicMock())
self.assertEqual(state, types.states.TaskState.RUNNING)
server = fixtures.get_id(fixtures.SERVERS_LIST, userservice._vmid)
server.power_state = openstack_types.PowerState.RUNNING
current_op = userservice._get_current_op()
# Invoke cancel
api.reset_mock()
state = userservice.cancel()
self.assertEqual(state, types.states.TaskState.RUNNING)
# Ensure DESTROY_VALIDATOR is in the queue
self.assertIn(types.services.Operation.DESTROY_VALIDATOR, userservice._queue)
self.assertEqual(
userservice._queue,
[current_op] + [OldOperation.STOP, OldOperation.REMOVE, OldOperation.FINISH],
)
counter = 0
for counter in limited_iterator(lambda: state == types.states.TaskState.RUNNING, limit=128):
state = userservice.check_state()
if counter > 5:
server.power_state = openstack_types.PowerState.SHUTDOWN
if counter == 8: # Stop so it can continue
fixtures.set_vm_state(userservice._vmid, fixtures.openstack_types.PowerState.SHUTDOWN)
# Now, should be finished without any problem, no call to api should have been done
self.assertEqual(state, types.states.TaskState.FINISHED, f'State: {state} {userservice._error_debug_info}')
api().get_server.assert_called()
api().stop_server.assert_called()
api().delete_server.assert_called()
# Now again, but process check_queue a couple of times before cancel
# we we have an _vmid
state = userservice.deploy_for_user(models.User())
self.assertEqual(state, types.states.TaskState.RUNNING)
for _ in limited_iterator(lambda: state == types.states.TaskState.RUNNING, limit=128):
state = userservice.check_state()
if userservice._vmid:
break
self.assertEqual(
state,
types.states.TaskState.RUNNING,
f'Queue: {userservice._queue} {userservice._error_debug_info} {userservice._reason} {userservice._vmid}',
)
# Ensure vm is running, so it gets stopped
fixtures.set_vm_state(userservice._vmid, fixtures.openstack_types.PowerState.RUNNING)
current_op = userservice._current_op()
state = userservice.cancel()
self.assertEqual(state, types.states.TaskState.RUNNING)
self.assertEqual(userservice._queue[0], current_op)
if graceful:
self.assertIn(types.services.Operation.SHUTDOWN, userservice._queue)
self.assertIn(types.services.Operation.SHUTDOWN_COMPLETED, userservice._queue)
self.assertIn(types.services.Operation.STOP, userservice._queue)
self.assertIn(types.services.Operation.STOP_COMPLETED, userservice._queue)
self.assertIn(types.services.Operation.DELETE, userservice._queue)
self.assertIn(types.services.Operation.DELETE_COMPLETED, userservice._queue)
for _ in limited_iterator(lambda: state == types.states.TaskState.RUNNING, limit=128):
state = userservice.check_state()
self.assertGreater(counter, 5)
self.assertEqual(state, types.states.TaskState.FINISHED)
def test_userservice_error(self) -> None:
"""
This test will not have keep on error active, and will create correctly
but will error on set_ready, so it will be put on error state
"""
"""
Test the user service
"""
for keep_on_error in (True, False):
for patcher in (fixtures.patched_provider, fixtures.patched_provider_legacy):
with patcher() as prov:
service = fixtures.create_live_service(prov, maintain_on_error=keep_on_error)
userservice = fixtures.create_live_userservice(service=service)
publication = userservice.publication()
publication._template_id = 'snap1'
if graceful:
api.shutdown_vm.assert_called()
else:
api.stop_vm.assert_called()
state = userservice.deploy_for_user(models.User())
self.assertEqual(state, types.states.TaskState.RUNNING)
server = fixtures.get_id(fixtures.SERVERS_LIST, userservice._vmid)
server.power_state = openstack_types.PowerState.RUNNING
for _counter in limited_iterator(lambda: state == types.states.TaskState.RUNNING, limit=128):
state = userservice.check_state()
# Correctly created
self.assertEqual(state, types.states.TaskState.FINISHED)
# We are going to force an error on set_ready
server.status = openstack_types.ServerStatus.ERROR
state = userservice.set_ready()
if keep_on_error:
self.assertEqual(state, types.states.TaskState.FINISHED)
else:
self.assertEqual(state, types.states.TaskState.ERROR)
def test_userservice_error_keep_create(self) -> None:
"""
This test will have keep on error active, and will create incorrectly
so vm will be deleted and put on error state
"""
for patcher in (fixtures.patched_provider, fixtures.patched_provider_legacy):
with patcher() as prov:
api = typing.cast(mock.MagicMock, prov.api())
service = fixtures.create_live_service(prov, maintain_on_eror=True)
userservice = fixtures.create_live_userservice(service=service)
publication = userservice.publication()
publication._template_id = 'snap1'
api.create_server_from_snapshot.side_effect = Exception('Error')
state = userservice.deploy_for_user(models.User())
self.assertEqual(state, types.states.TaskState.ERROR)
def test_userservice_basics(self) -> None:
with self.setup_data() as (userservice, _api):
userservice.set_ip('1.2.3.4')
self.assertEqual(userservice.get_ip(), '1.2.3.4')

View File

@ -117,7 +117,7 @@ class DynamicService(services.Service, abc.ABC): # pylint: disable=too-many-pub
"""
return name
@abc.abstractmethod
# overridable
def find_duplicated_machines(self, name: str, mac: str) -> collections.abc.Iterable[str]:
"""
Checks if a machine with the same name or mac exists
@ -133,7 +133,7 @@ class DynamicService(services.Service, abc.ABC): # pylint: disable=too-many-pub
Note:
Maybe we can only check name or mac, or both, depending on the service
"""
...
raise NotImplementedError('find_duplicated_machines must be implemented if remove_duplicates is used!')
@typing.final
def perform_find_duplicated_machines(self, name: str, mac: str) -> collections.abc.Iterable[str]:

View File

@ -508,7 +508,7 @@ class DynamicUserService(services.UserService, autoserializable.AutoSerializable
def error_reason(self) -> str:
return self._reason
def remove_duplicated_name(self) -> None:
def remove_duplicated_names(self) -> None:
name = self.get_vmname()
try:
for vmid in self.service().perform_find_duplicated_machines(name, self.get_unique_id()):
@ -536,7 +536,7 @@ class DynamicUserService(services.UserService, autoserializable.AutoSerializable
If you override this method, you should take care yourself of removing duplicated machines
(maybe only calling "remove_duplicated_name" method)
"""
self.remove_duplicated_name()
self.remove_duplicated_names()
@abc.abstractmethod
def op_create(self) -> None:

View File

@ -329,7 +329,7 @@ def remove_duplicates_field(
order=order,
tooltip=_('If active, found duplicates vApps for this service will be removed'),
tab=tab,
old_field_name=old_field_name,
old_field_name=old_field_name or 'removeDuplicates',
)