1
0
mirror of https://github.com/dkmstr/openuds.git synced 2025-02-02 09:47:13 +03:00

Fix OpenStack and Proxmox service issues. Finished tests for OpenStack.

This commit is contained in:
Adolfo Gómez García 2024-03-13 02:11:45 +01:00
parent 7be200f173
commit 7ecadee56f
No known key found for this signature in database
GPG Key ID: DD1ABF20724CDA23
11 changed files with 399 additions and 81 deletions

View File

@ -85,24 +85,28 @@ class FixedService(services.Service, abc.ABC): # pylint: disable=too-many-publi
label=_('Use snapshots'),
default=False,
order=22,
tooltip=_('If active, UDS will try to create an snapshot (if one already does not exists) before accessing a machine, and restore it after usage.'),
tooltip=_(
'If active, UDS will try to create an snapshot (if one already does not exists) before accessing a machine, and restore it after usage.'
),
tab=_('Machines'),
old_field_name='useSnapshots',
)
# This one replaces use_snapshots, and is used to select the snapshot type (No snapshot, recover snapshot and stop machine, recover snapshot and start machine)
snapshot_type = gui.ChoiceField(
label=_('Snapshot type'),
order=22,
default='0',
tooltip=_('If active, UDS will try to create an snapshot (if one already does not exists) before accessing a machine, and restore it after usage.'),
tooltip=_(
'If active, UDS will try to create an snapshot (if one already does not exists) before accessing a machine, and restore it after usage.'
),
tab=_('Machines'),
choices=[
gui.choice_item('no', _('No snapshot')),
gui.choice_item('stop', _('Recover snapshot and stop machine')),
gui.choice_item('start', _('Recover snapshot and start machine')),
],
)
)
# Keep name as "machine" so we can use VCHelpers.getMachines
machines = gui.MultiChoiceField(
@ -115,13 +119,15 @@ class FixedService(services.Service, abc.ABC): # pylint: disable=too-many-publi
)
def _get_assigned_machines(self) -> typing.Set[str]:
vals = self.storage.get_unpickle('vms')
with self.storage.as_dict() as d:
vals = d.get('vms', None)
logger.debug('Got storage VMS: %s', vals)
return vals or set()
def _save_assigned_machines(self, vals: typing.Set[str]) -> None:
logger.debug('Saving storage VMS: %s', vals)
self.storage.put_pickle('vms', vals)
with self.storage.as_dict() as d:
d['vms'] = vals
def process_snapshot(self, remove: bool, userservice_instance: 'FixedUserService') -> None:
"""
@ -169,8 +175,7 @@ class FixedService(services.Service, abc.ABC): # pylint: disable=too-many-publi
@abc.abstractmethod
def get_guest_ip_address(self, vmid: str) -> str:
"""Returns the guest ip address of the machine
"""
"""Returns the guest ip address of the machine"""
raise NotImplementedError()
@abc.abstractmethod

View File

@ -87,6 +87,9 @@ class FixedUserService(services.UserService, autoserializable.AutoSerializable,
_task = autoserializable.StringField(default='')
_queue = autoserializable.ListField[Operation]() # Default is empty list
# Note that even if SNAPHSHOT operations are in middel
# implementations may opt to no have snapshots at all
# In this case, the process_snapshot method will do nothing
_create_queue: typing.ClassVar[list[Operation]] = [
Operation.CREATE,
Operation.SNAPSHOT_CREATE,
@ -210,27 +213,13 @@ class FixedUserService(services.UserService, autoserializable.AutoSerializable,
if op == Operation.FINISH:
return types.states.TaskState.FINISHED
fncs: dict[Operation, collections.abc.Callable[[], None]] = {
Operation.CREATE: self._create,
Operation.RETRY: self._retry,
Operation.START: self._start_machine,
Operation.STOP: self._stop_machine,
Operation.WAIT: self._wait,
Operation.REMOVE: self._remove,
Operation.SNAPSHOT_CREATE: self._snapshot_create,
Operation.SNAPSHOT_RECOVER: self._snapshot_recover,
Operation.PROCESS_TOKEN: self._process_token,
Operation.SOFT_SHUTDOWN: self._soft_shutdown_machine,
Operation.NOP: self._nop,
}
try:
operation_runner: typing.Optional[collections.abc.Callable[[], None]] = fncs.get(op, None)
operation_runner = _EXEC_FNCS.get(op, None)
if not operation_runner:
return self._error(f'Unknown operation found at execution queue ({op})')
operation_runner()
operation_runner(self)
return types.states.TaskState.RUNNING
except Exception as e:
@ -386,27 +375,13 @@ class FixedUserService(services.UserService, autoserializable.AutoSerializable,
if op == Operation.FINISH:
return types.states.TaskState.FINISHED
FNCS: typing.Final[dict[Operation, collections.abc.Callable[[], types.states.TaskState]]] = {
Operation.CREATE: self._create_checker,
Operation.RETRY: self._retry_checker,
Operation.WAIT: self._wait_checker,
Operation.START: self._start_checker,
Operation.STOP: self._stop_checker,
Operation.REMOVE: self._removed_checker,
Operation.SNAPSHOT_CREATE: self._snapshot_create_checker,
Operation.SNAPSHOT_RECOVER: self._snapshot_recover_checker,
Operation.PROCESS_TOKEN: self._process_token_checker,
Operation.SOFT_SHUTDOWN: self._soft_shutdown_checker,
Operation.NOP: self._nop_checker,
}
try:
check_function: typing.Optional[collections.abc.Callable[[], types.states.TaskState]] = FNCS.get(op, None)
check_function = _CHECK_FNCS.get(op, None)
if check_function is None:
return self._error('Unknown operation found at check queue ({0})'.format(op))
state = check_function()
state = check_function(self)
if state == types.states.TaskState.FINISHED:
self._pop_current_op() # Remove runing op, till now only was "peek"
return self._execute_queue()
@ -487,3 +462,34 @@ class FixedUserService(services.UserService, autoserializable.AutoSerializable,
self._vmid,
self._task,
)
_EXEC_FNCS: typing.Final[collections.abc.Mapping[Operation, collections.abc.Callable[[FixedUserService], None]]] = {
Operation.CREATE: FixedUserService._create,
Operation.RETRY: FixedUserService._retry,
Operation.START: FixedUserService._start_machine,
Operation.STOP: FixedUserService._stop_machine,
Operation.WAIT: FixedUserService._wait,
Operation.REMOVE: FixedUserService._remove,
Operation.SNAPSHOT_CREATE: FixedUserService._snapshot_create,
Operation.SNAPSHOT_RECOVER: FixedUserService._snapshot_recover,
Operation.PROCESS_TOKEN: FixedUserService._process_token,
Operation.SOFT_SHUTDOWN: FixedUserService._soft_shutdown_machine,
Operation.NOP: FixedUserService._nop,
}
_CHECK_FNCS: typing.Final[collections.abc.Mapping[Operation, collections.abc.Callable[[FixedUserService], types.states.TaskState]]] = {
Operation.CREATE: FixedUserService._create_checker,
Operation.RETRY: FixedUserService._retry_checker,
Operation.WAIT: FixedUserService._wait_checker,
Operation.START: FixedUserService._start_checker,
Operation.STOP: FixedUserService._stop_checker,
Operation.REMOVE: FixedUserService._removed_checker,
Operation.SNAPSHOT_CREATE: FixedUserService._snapshot_create_checker,
Operation.SNAPSHOT_RECOVER: FixedUserService._snapshot_recover_checker,
Operation.PROCESS_TOKEN: FixedUserService._process_token_checker,
Operation.SOFT_SHUTDOWN: FixedUserService._soft_shutdown_checker,
Operation.NOP: FixedUserService._nop_checker,
}

View File

@ -60,6 +60,9 @@ class OpenStackUserServiceFixed(FixedUserService, autoserializable.AutoSerializa
# : Recheck every ten seconds by default (for task methods)
suggested_delay = 4
# Override _assign_queue
# Utility overrides for type checking...
def service(self) -> 'service_fixed.OpenStackServiceFixed':
@ -70,11 +73,11 @@ class OpenStackUserServiceFixed(FixedUserService, autoserializable.AutoSerializa
return types.states.TaskState.FINISHED
try:
vminfo = self.service().get_machine_info(self._vmid)
server_info = self.service().api.get_server(self._vmid)
except Exception as e:
return self._error(f'Machine not found: {e}')
if vminfo.status == 'stopped':
if server_info.status == 'stopped':
self._queue = [Operation.START, Operation.FINISH]
return self._execute_queue()
@ -99,10 +102,10 @@ class OpenStackUserServiceFixed(FixedUserService, autoserializable.AutoSerializa
def _start_machine(self) -> None:
try:
vminfo = self.service().get_machine_info(self._vmid)
server_info = self.service().api.get_server(self._vmid)
except Exception as e:
raise Exception('Machine not found on start machine') from e
if vminfo.power_state != openstack_types.PowerState.RUNNING:
if server_info.power_state != openstack_types.PowerState.RUNNING:
self.service().api.start_server(self._vmid) # Start the server

View File

@ -37,11 +37,10 @@ import typing
from django.utils.translation import gettext_noop as _
from uds.core import environment, types, consts
from uds.core import environment, types
from uds.core.services import ServiceProvider
from uds.core.ui import gui
from uds.core.util import validators, fields
from uds.core.util.decorators import cached
from .openstack import openstack_client, sanitized_name, types as openstack_types
from .service import OpenStackLiveService
@ -246,7 +245,6 @@ class OpenStackProviderLegacy(ServiceProvider):
"""
return OpenStackProviderLegacy(env, data).test_connection()
@cached('reachable', consts.cache.SHORT_CACHE_TIMEOUT)
def is_available(self) -> bool:
"""
Check if aws provider is reachable

View File

@ -47,7 +47,7 @@ from .deployment_fixed import OpenStackUserServiceFixed
# Not imported at runtime, just for type checking
if typing.TYPE_CHECKING:
from uds import models
from .openstack import openstack_client, types as openstack_types
from .openstack import openstack_client
from .provider import OpenStackProvider
from .provider_legacy import OpenStackProviderLegacy
@ -157,9 +157,6 @@ class OpenStackServiceFixed(FixedService): # pylint: disable=too-many-public-me
def provider(self) -> 'AnyOpenStackProvider':
return typing.cast('AnyOpenStackProvider', super().provider())
def get_machine_info(self, vmid: str) -> 'openstack_types.ServerInfo':
return self.api.get_server(vmid)
def is_avaliable(self) -> bool:
return self.provider().is_available()
@ -168,23 +165,25 @@ class OpenStackServiceFixed(FixedService): # pylint: disable=too-many-public-me
servers = {server.id:server.name for server in self.api.list_servers() if not server.name.startswith('UDS-')}
assigned_servers = self._get_assigned_machines()
# Only machines not assigned, and that exists on openstack will be available
return [
gui.choice_item(k, servers.get(k, 'Unknown!'))
gui.choice_item(k, servers[k])
for k in self.machines.as_list()
if k not in assigned_servers
and k in servers
]
def assign_from_assignables(
self, assignable_id: str, user: 'models.User', userservice_instance: 'services.UserService'
) -> types.states.TaskState:
proxmox_service_instance = typing.cast(OpenStackUserServiceFixed, userservice_instance)
openstack_userservice_instance = typing.cast(OpenStackUserServiceFixed, userservice_instance)
assigned_vms = self._get_assigned_machines()
if assignable_id not in assigned_vms:
assigned_vms.add(assignable_id)
self._save_assigned_machines(assigned_vms)
return proxmox_service_instance.assign(assignable_id)
return openstack_userservice_instance.assign(assignable_id)
return proxmox_service_instance.error('VM not available!')
return openstack_userservice_instance.error('VM not available!')
def process_snapshot(self, remove: bool, userservice_instance: FixedUserService) -> None:
return # No snapshots support
@ -198,7 +197,7 @@ class OpenStackServiceFixed(FixedService): # pylint: disable=too-many-public-me
try:
# Invoke to check it exists, do not need to store the result
if self.api.get_server(checking_vmid).status.is_lost():
raise Exception('Machine not found') # Process on except
raise Exception('Machine not found') # Simply translate is_lost to an exception
found_vmid = checking_vmid
break
except Exception: # Notifies on log, but skipt it
@ -220,7 +219,7 @@ class OpenStackServiceFixed(FixedService): # pylint: disable=too-many-public-me
if not found_vmid:
raise Exception('All machines from list already assigned.')
return str(found_vmid)
return found_vmid
def get_first_network_mac(self, vmid: str) -> str:
return self.api.get_server(vmid).addresses[0].mac

View File

@ -43,19 +43,29 @@ from uds.core.ui.user_interface import gui
from ...utils.autospec import autospec, AutoSpecMethodInfo
from uds.services.OpenStack import provider, provider_legacy, service, publication, deployment
from uds.services.OpenStack import (
provider,
provider_legacy,
service,
publication,
deployment,
service_fixed,
deployment_fixed,
)
from uds.services.OpenStack.openstack import openstack_client, types as openstack_types
AnyOpenStackProvider: typing.TypeAlias = typing.Union[provider.OpenStackProvider, provider_legacy.OpenStackProviderLegacy]
AnyOpenStackProvider: typing.TypeAlias = typing.Union[
provider.OpenStackProvider, provider_legacy.OpenStackProviderLegacy
]
GUEST_IP_ADDRESS: str = '1.0.0.1'
FLAVORS_LIST: list[openstack_types.FlavorInfo] = [
openstack_types.FlavorInfo(
id='ai2.xlarge.4',
name='ai2.xlarge.4',
vcpus=4,
id=f'fid{n}',
name=f'Flavor name{n}',
vcpus=n,
ram=1024 * n, # MiB
disk=1024 * 1024 * n, # GiB
swap=0,
@ -343,6 +353,14 @@ SERVICE_VALUES_DICT: typing.Final[gui.ValuesDictType] = {
# 'prov_uuid': str(uuid.uuid4()), # Not stored on db, so not needed
}
SERVICES_FIXED_VALUES_DICT: typing.Final[gui.ValuesDictType] = {
'token': 'token',
'region': random.choice(REGIONS_LIST).id,
'project': random.choice(PROJECTS_LIST).id,
'machines': [i.id for i in random.sample(SERVERS_LIST, 4)],
# 'prov_uuid': str(uuid.uuid4()), # Not stored on db, so not needed
}
def create_client_mock() -> mock.Mock:
"""
@ -395,9 +413,7 @@ def create_provider_legacy(**kwargs: typing.Any) -> provider_legacy.OpenStackPro
)
def create_live_service(
provider: AnyOpenStackProvider, **kwargs: typing.Any
) -> service.OpenStackLiveService:
def create_live_service(provider: AnyOpenStackProvider, **kwargs: typing.Any) -> service.OpenStackLiveService:
"""
Create a service
"""
@ -411,7 +427,8 @@ def create_live_service(
values=values,
uuid=uuid_,
)
def create_publication(service: service.OpenStackLiveService) -> publication.OpenStackLivePublication:
"""
Create a publication
@ -425,6 +442,7 @@ def create_publication(service: service.OpenStackLiveService) -> publication.Ope
uuid=uuid_,
)
def create_live_userservice(
service: service.OpenStackLiveService,
publication: typing.Optional[publication.OpenStackLivePublication] = None,
@ -439,3 +457,38 @@ def create_live_userservice(
publication=publication or create_publication(service),
uuid=uuid_,
)
def create_fixed_service(
provider: AnyOpenStackProvider, **kwargs: typing.Any
) -> service_fixed.OpenStackServiceFixed:
"""
Create a fixed service
"""
values = SERVICES_FIXED_VALUES_DICT.copy()
values.update(kwargs)
uuid_ = str(uuid.uuid4())
return service_fixed.OpenStackServiceFixed(
provider=provider,
environment=environment.Environment.private_environment(uuid_),
values=values,
uuid=uuid_,
)
# Fixed has no publications
def create_fixed_userservice(
service: service_fixed.OpenStackServiceFixed,
) -> deployment_fixed.OpenStackUserServiceFixed:
"""
Create a linked user service
"""
uuid_ = str(uuid.uuid4())
return deployment_fixed.OpenStackUserServiceFixed(
environment=environment.Environment.private_environment(uuid_),
service=service,
uuid=uuid_,
)

View File

@ -30,8 +30,10 @@
"""
Author: Adolfo Gómez, dkmaster at dkmon dot com
"""
import random
from unittest import mock
from uds import models
from uds.core import types, environment
from . import fixtures
@ -53,13 +55,15 @@ class TestOpenstackProvider(UDSTransactionTestCase):
self.assertEqual(provider.test_connection(), types.core.TestResult(True, mock.ANY))
# Ensure test_connection is called
client.test_connection.assert_called_once()
self.assertEqual(provider.is_available(), True)
client.is_available.assert_called_once()
# Clear mock calls
client.reset_mock()
OpenStackProvider.test(env=environment.Environment.testing_environment(), data=fixtures.PROVIDER_VALUES_DICT)
OpenStackProvider.test(
env=environment.Environment.testing_environment(), data=fixtures.PROVIDER_VALUES_DICT
)
def test_provider_legacy(self) -> None:
"""
@ -71,11 +75,57 @@ class TestOpenstackProvider(UDSTransactionTestCase):
self.assertEqual(provider.test_connection(), types.core.TestResult(True, mock.ANY))
# Ensure test_connection is called
client.test_connection.assert_called_once()
self.assertEqual(provider.is_available(), True)
client.is_available.assert_called_once()
# Clear mock calls
client.reset_mock()
OpenStackProviderLegacy.test(env=environment.Environment.testing_environment(), data=fixtures.PROVIDER_VALUES_DICT)
OpenStackProviderLegacy.test(
env=environment.Environment.testing_environment(), data=fixtures.PROVIDER_VALUES_DICT
)
def test_helpers(self) -> None:
"""
Test the Helpers. In fact, not used on provider, but on services (fixed, live, ...)
"""
from uds.services.OpenStack.helpers import get_machines, get_resources, get_volumes
for prov in (fixtures.create_provider_legacy(), fixtures.create_provider()):
with fixtures.patch_provider_api(legacy=prov.legacy) as _api:
# Ensure exists on db
db_provider = models.Provider.objects.create(
name='test proxmox provider',
comments='test comments',
data_type=prov.type_type,
data=prov.serialize(),
)
parameters: dict[str, str] = {
'prov_uuid': db_provider.uuid,
'project': random.choice(fixtures.PROJECTS_LIST).id,
'region': random.choice(fixtures.REGIONS_LIST).id,
}
# Test get_storage
h_machines = get_machines(parameters)
self.assertEqual(len(h_machines), 1)
self.assertEqual(h_machines[0]['name'], 'machines')
self.assertEqual(sorted(i['id'] for i in h_machines[0]['choices']), sorted(i.id for i in fixtures.SERVERS_LIST))
h_resources = get_resources(parameters)
# [{'name': 'availability_zone', 'choices': [...]}, {'name': 'network', 'choices': [...]}, {'name': 'flavor', 'choices': [...]}, {'name': 'security_groups', 'choices': [...]}]
self.assertEqual(len(h_resources), 4)
self.assertEqual(sorted(i['name'] for i in h_resources), ['availability_zone', 'flavor', 'network', 'security_groups'])
def _get_choices_for(name: str) -> list[str]:
return [i['id'] for i in next(i for i in h_resources if i['name'] == name)['choices']]
self.assertEqual(sorted(_get_choices_for('availability_zone')), sorted(i.id for i in fixtures.AVAILABILITY_ZONES_LIST))
self.assertEqual(sorted(_get_choices_for('network')), sorted(i.id for i in fixtures.NETWORKS_LIST))
self.assertEqual(sorted(_get_choices_for('flavor')), sorted(i.id for i in fixtures.FLAVORS_LIST if not i.disabled))
self.assertEqual(sorted(_get_choices_for('security_groups')), sorted(i.id for i in fixtures.SECURITY_GROUPS_LIST))
# [{'name': 'volume', 'choices': [...]}]
h_volumes = get_volumes(parameters)
self.assertEqual(len(h_volumes), 1)
self.assertEqual(h_volumes[0]['name'], 'volume')
self.assertEqual(sorted(i['id'] for i in h_volumes[0]['choices']), sorted(i.id for i in fixtures.VOLUMES_LIST))

View File

@ -0,0 +1,110 @@
# -*- coding: utf-8 -*-
#
# Copyright (c) 2024 Virtual Cable S.L.U.
# All rights reserved.
#
# Redistribution and use in source and binary forms, with or without modification,
# are permitted provided that the following conditions are met:
#
# * Redistributions of source code must retain the above copyright notice,
# this list of conditions and the following disclaimer.
# * Redistributions in binary form must reproduce the above copyright notice,
# this list of conditions and the following disclaimer in the documentation
# and/or other materials provided with the distribution.
# * Neither the name of Virtual Cable S.L.U. nor the names of its contributors
# may be used to endorse or promote products derived from this software
# without specific prior written permission.
#
# THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS IS"
# AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE
# IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE ARE
# DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT HOLDER OR CONTRIBUTORS BE LIABLE
# FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR CONSEQUENTIAL
# DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR
# SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER
# CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY,
# OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE
# OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
"""
Author: Adolfo Gómez, dkmaster at dkmon dot com
"""
from uds import models
from uds.core import types
from . import fixtures
from ...utils.test import UDSTransactionTestCase
class TestProxmovFixedService(UDSTransactionTestCase):
def test_service_fixed(self) -> None:
"""
Test the service
"""
for prov in (fixtures.create_provider_legacy(), fixtures.create_provider()):
with fixtures.patch_provider_api(legacy=prov.legacy) as api:
service = fixtures.create_fixed_service(prov) # Will use provider patched api
userservice = fixtures.create_fixed_userservice(service)
# Test service is_available method
self.assertTrue(service.is_avaliable())
api.is_available.assert_called()
# assignables should be same as service.macines
assignables = set(i['id'] for i in service.enumerate_assignables())
self.assertEqual(assignables, set(service.machines.value), f'legacy={prov.legacy}')
# Remove one machine from fixtures servers_list (from the first one on service.machine.value)
prev_servers_list = fixtures.SERVERS_LIST.copy()
fixtures.SERVERS_LIST[:] = [
i for i in fixtures.SERVERS_LIST if i.id != service.machines.value[0]
]
# Now should not return the first from service.machines.value
assignables = set(i['id'] for i in service.enumerate_assignables())
self.assertEqual(assignables, set(set(service.machines.value[1:])), f'legacy={prov.legacy}')
# Restore servers_list and assignables
fixtures.SERVERS_LIST[:] = prev_servers_list
assignables = set(i['id'] for i in service.enumerate_assignables())
to_assign = list(assignables)[0]
# Assign one, and test it's not available anymore
# First time, it will run the queue, so we should receive a RUNNING
self.assertEqual(
service.assign_from_assignables(to_assign, models.User(), userservice),
types.states.TaskState.RUNNING,
)
# Now it's not on available list for any new user nor user service
self.assertEqual(
set(i['id'] for i in service.enumerate_assignables()) ^ assignables,
{list(assignables)[0]},
)
# If called again, should return types.states.TaskState.ERROR beause it's already assigned
self.assertEqual(
service.assign_from_assignables(to_assign, models.User(), userservice),
types.states.TaskState.ERROR,
)
# How many assignables machines are available?
remaining = len(list(service.enumerate_assignables()))
api.get_server.reset_mock()
# Now get_and_assign_machine as much as remaining machines
for _ in range(remaining):
vm = service.get_and_assign_machine()
self.assertIn(vm, assignables)
# enumarate_assignables should return an empty list now
self.assertEqual(list(service.enumerate_assignables()), [])
# And get_server should have been called remaining times
self.assertEqual(api.get_server.call_count, remaining)
# And a new try, should raise an exception
self.assertRaises(Exception, service.get_and_assign_machine)

View File

@ -0,0 +1,102 @@
# -*- coding: utf-8 -*-
#
# Copyright (c) 2024 Virtual Cable S.L.U.
# All rights reserved.
#
# Redistribution and use in source and binary forms, with or without modification,
# are permitted provided that the following conditions are met:
#
# * Redistributions of source code must retain the above copyright notice,
# this list of conditions and the following disclaimer.
# * Redistributions in binary form must reproduce the above copyright notice,
# this list of conditions and the following disclaimer in the documentation
# and/or other materials provided with the distribution.
# * Neither the name of Virtual Cable S.L.U. nor the names of its contributors
# may be used to endorse or promote products derived from this software
# without specific prior written permission.
#
# THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS IS"
# AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE
# IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE ARE
# DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT HOLDER OR CONTRIBUTORS BE LIABLE
# FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR CONSEQUENTIAL
# DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR
# SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER
# CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY,
# OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE
# OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
"""
Author: Adolfo Gómez, dkmaster at dkmon dot com
"""
from unittest import mock
from uds import models
from uds.core import types
from . import fixtures
from ...utils.test import UDSTransactionTestCase
from ...utils.generators import limited_iterator
# We use transactions on some related methods (storage access, etc...)
class TestProxmovLinkedService(UDSTransactionTestCase):
def test_userservice_fixed_user(self) -> None:
"""
Test the user service
"""
with fixtures.patch_provider_api() as _api:
userservice = fixtures.create_userservice_fixed()
service = userservice.service()
self.assertEqual(service._get_assigned_machines(), set())
# patch userservice db_obj() method to return a mock
userservice_db = mock.MagicMock()
userservice.db_obj = mock.MagicMock(return_value=userservice_db)
# Test Deploy for cache, should raise Exception due
# to the fact fixed services cannot have cached items
with self.assertRaises(Exception):
userservice.deploy_for_cache(level=1)
state = userservice.deploy_for_user(models.User())
self.assertEqual(state, types.states.TaskState.RUNNING)
while state == types.states.TaskState.RUNNING:
state = userservice.check_state()
self.assertEqual(state, types.states.TaskState.FINISHED)
# userservice_db Should have halle set_in_use(True)
userservice_db.set_in_use.assert_called_once_with(True)
# vmid should have been assigned, so it must be in the assigned machines
self.assertEqual(set(userservice._vmid), service._get_assigned_machines())
# Now, let's release the service
state = userservice.destroy()
self.assertEqual(state, types.states.TaskState.RUNNING)
while state == types.states.TaskState.RUNNING:
state = userservice.check_state()
self.assertEqual(state, types.states.TaskState.FINISHED)
# must be empty now
self.assertEqual(service._get_assigned_machines(), set())
# set_ready, machine is "stopped" in this test, so must return RUNNING
state = userservice.set_ready()
self.assertEqual(state, types.states.TaskState.RUNNING)
for _ in limited_iterator(lambda: state == types.states.TaskState.RUNNING, limit=32):
state = userservice.check_state()
# Should be finished now
self.assertEqual(state, types.states.TaskState.FINISHED)

View File

@ -30,15 +30,9 @@
"""
Author: Adolfo Gómez, dkmaster at dkmon dot com
"""
import typing
import datetime
import collections.abc
import itertools
from unittest import mock
from uds.core import types, ui, environment
from uds.services.OpenNebula.on.vm import remove_machine
from uds.services.Proxmox.publication import ProxmoxPublication
from uds.core import types
from . import fixtures

View File

@ -33,9 +33,7 @@ Author: Adolfo Gómez, dkmaster at dkmon dot com
import typing
from unittest import mock
from uds.core import types, ui, environment
from uds.services.Proxmox.service_fixed import ProxmoxServiceFixed
from uds.core import types, ui
from . import fixtures
from ...utils.test import UDSTransactionTestCase