1
0
mirror of https://github.com/dkmstr/openuds.git synced 2025-01-08 21:18:00 +03:00

Started Xen Tests for new implementation

This commit is contained in:
Adolfo Gómez García 2024-05-14 03:15:09 +02:00
parent 6d4d79f0b7
commit 850b5e35e1
No known key found for this signature in database
GPG Key ID: DD1ABF20724CDA23
13 changed files with 488 additions and 280 deletions

View File

@ -0,0 +1,321 @@
# -*- coding: utf-8 -*-
#
# Copyright (c) 2024 Virtual Cable S.L.U.
# All rights reserved.
#
# Redistribution and use in source and binary forms, with or without modification,
# are permitted provided that the following conditions are met:
#
# * Redistributions of source code must retain the above copyright notice,
# this list of conditions and the following disclaimer.
# * Redistributions in binary form must reproduce the above copyright notice,
# this list of conditions and the following disclaimer in the documentation
# and/or other materials provided with the distribution.
# * Neither the name of Virtual Cable S.L.U. nor the names of its contributors
# may be used to endorse or promote products derived from this software
# without specific prior written permission.
#
# THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS IS"
# AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE
# IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE ARE
# DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT HOLDER OR CONTRIBUTORS BE LIABLE
# FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR CONSEQUENTIAL
# DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR
# SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER
# CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY,
# OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE
# OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
# pyright: reportConstantRedefinition=false
"""
Author: Adolfo Gómez, dkmaster at dkmon dot com
"""
import contextlib
import functools
import random
import typing
import datetime
from unittest import mock
import uuid
from tests.utils import search_item_by_attr
from uds.core import types, environment
from uds.core.ui.user_interface import gui
from ...utils.autospec import autospec, AutoSpecMethodInfo
from uds.services.Xen import (
deployment,
provider,
service_fixed,
publication,
deployment_fixed,
service,
)
from uds.services.Xen.xen import types as xen_types, exceptions as xen_exceptions, client
DEF_TASK_INFO = xen_types.TaskInfo(
opaque_ref='OpaqueRef:12345678-1234-1234-1234-1234567890ab',
uuid='12345678-1234-1234-1234-1234567890ab',
name='test_task',
description='Test task description',
created=datetime.datetime(2024, 1, 1, 0, 0, 0),
finished=datetime.datetime(2024, 1, 1, 0, 0, 0),
status=xen_types.TaskStatus.SUCCESS,
result='Test task result',
progress=100,
)
DEF_SRS_INFO = [
xen_types.StorageInfo(
opaque_ref=f'OpaqueRef:12345678-1234-1234-1234-1234567890{i:02x}',
uuid=f'12345678-1234-1234-1234-1234567890{i:02x}',
name=f'test_sr{i:02x}',
description=f'Test SR description {i:02x}',
allowed_operations=[
xen_types.StorageOperations.VDI_CREATE,
xen_types.StorageOperations.VDI_CLONE,
xen_types.StorageOperations.VDI_SNAPSHOT,
xen_types.StorageOperations.VDI_DESTROY,
],
VDIs=[],
PBDs=[],
virtual_allocation=i * 1024 * 1024,
physical_utilisation=i * 1024 * 1024,
physical_size=i * 1024 * 1024 * 1024,
type='',
content_type='',
shared=True,
)
for i in range(8)
]
SRS_INFO = DEF_SRS_INFO
TASK_INFO = DEF_TASK_INFO
def initialize_defaults() -> None:
"""
Initialize default values for the module variables
"""
global TASK_INFO, SRS_INFO
TASK_INFO = DEF_TASK_INFO
SRS_INFO = DEF_SRS_INFO
T = typing.TypeVar('T')
def random_from_list(lst: list[T], *args: typing.Any, **kwargs: typing.Any) -> T:
"""
Returns a random VM
"""
return random.choice(lst)
# def set_vm_attr_by_id(attr: str, value: typing.Any, vmid: str) -> None:
# try:
# next(filter(lambda x: x.id == vmid, VMS)).__setattr__(attr, value)
# except StopIteration:
# raise az_exceptions.AzureNotFoundException(f'Item with id=="{vmid}" not found in list')
# def set_vm_attr(attr: str, value: typing.Any, _resource_group_name: str, name: str) -> None:
# try:
# next(filter(lambda x: x.name == name, VMS)).__setattr__(attr, value)
# except StopIteration:
# raise az_exceptions.AzureNotFoundException(f'Item with name=="{name}" not found in list')
def search_by_attr(lst: list[T], attribute: str, value: typing.Any, **kwargs: typing.Any) -> T:
"""
Returns an item from a list of items
"""
try:
return search_item_by_attr(lst, attribute, value)
except ValueError:
raise xen_exceptions.XenNotFoundError(f'Item with {attribute}=="{value}" not found in list')
# Methods that returns None or "internal" methods are not tested
CLIENT_METHODS_INFO: typing.Final[list[AutoSpecMethodInfo]] = [
AutoSpecMethodInfo(
client.XenClient.has_pool,
returns=True,
),
AutoSpecMethodInfo(
client.XenClient.get_pool_name,
returns='TEST_pool_NAME',
),
# Default login and logout, skip them
AutoSpecMethodInfo(
client.XenClient.check_login,
returns=True,
),
# Default test, skip it
AutoSpecMethodInfo(
client.XenClient.get_task_info,
returns=TASK_INFO,
),
AutoSpecMethodInfo(
client.XenClient.list_srs,
returns=SRS_INFO,
),
AutoSpecMethodInfo(
client.XenClient.get_sr_info,
returns=search_by_attr,
partial_args=(SRS_INFO, 'opaque_ref'),
),
]
PROVIDER_VALUES_DICT: typing.Final[gui.ValuesDictType] = {
'host': 'test.example.com',
'username': 'root',
'password': 'some_test_password',
'concurrent_creation_limit': 18,
'concurrent_removal_limit': 7,
'macs_range': '02:99:00:00:00:00-02:AA:00:FF:FF:FF',
'verify_ssl': True,
'host_backup': 'test_backup.example.com',
}
SERVICE_VALUES_DICT: typing.Final[gui.ValuesDictType] = {
'datastore': 'OpaqueRef:b2143f92-e234-445c-ad3a-8582f8b893b6',
'min_space_gb': 32,
'machine': 'OpaqueRef:dd22df22-f243-4ec4-8b6f-6f31942f4c58',
'network': 'OpaqueRef:e623a082-01b7-4d4f-a777-6689387e6fd9',
'memory': 256,
'shadow': 4,
'remove_duplicates': True,
'maintain_on_error': False,
'try_soft_shutdown': False,
'basename': 'xcpng8',
'lenname': 5,
}
SERVICE_FIXED_VALUES_DICT: gui.ValuesDictType = {
'token': 'TEST_TOKEN_XEN',
'folder': '/Fixed Pool',
'machines': [
'OpaqueRef:f2fa4939-9953-4d65-b5d0-153f867dad32',
'OpaqueRef:812d6b43-dadb-4b18-a749-aba6f2122d75',
],
'use_snapshots': True,
'randomize': True,
'prov_uuid': '',
}
def create_client_mock() -> mock.Mock:
"""
Create a mock of ProxmoxClient
"""
return autospec(client.XenClient, CLIENT_METHODS_INFO)
@contextlib.contextmanager
def patched_provider(
**kwargs: typing.Any,
) -> typing.Generator[provider.XenProvider, None, None]:
client = create_client_mock()
provider = create_provider(**kwargs)
with mock.patch.object(provider, '_api') as api:
api.return_value = client
yield provider
def create_provider(**kwargs: typing.Any) -> provider.XenProvider:
"""
Create a provider
"""
values = PROVIDER_VALUES_DICT.copy()
values.update(kwargs)
uuid_ = str(uuid.uuid4())
return provider.XenProvider(
environment=environment.Environment.private_environment(uuid), values=values, uuid=uuid_
)
def create_service_linked(
provider: typing.Optional[provider.XenProvider] = None, **kwargs: typing.Any
) -> service.XenLinkedService:
"""
Create a fixed service
"""
uuid_ = str(uuid.uuid4())
values = SERVICE_VALUES_DICT.copy()
values.update(kwargs)
return service.XenLinkedService(
environment=environment.Environment.private_environment(uuid_),
provider=provider or create_provider(),
values=values,
uuid=uuid_,
)
def create_service_fixed(
provider: typing.Optional[provider.XenProvider] = None, **kwargs: typing.Any
) -> service_fixed.XenFixedService:
"""
Create a fixed service
"""
uuid_ = str(uuid.uuid4())
values = SERVICE_FIXED_VALUES_DICT.copy()
values.update(kwargs)
return service_fixed.XenFixedService(
environment=environment.Environment.private_environment(uuid_),
provider=provider or create_provider(),
values=values,
uuid=uuid_,
)
def create_publication(
service: typing.Optional[service.XenLinkedService] = None,
**kwargs: typing.Any,
) -> 'publication.XenPublication':
"""
Create a publication
"""
uuid_ = str(uuid.uuid4())
return publication.XenPublication(
environment=environment.Environment.private_environment(uuid_),
service=service or create_service_linked(**kwargs),
revision=1,
servicepool_name='servicepool_name',
uuid=uuid_,
)
def create_userservice_fixed(
service: typing.Optional[service_fixed.XenFixedService] = None,
) -> deployment_fixed.XenFixedUserService:
"""
Create a fixed user service, has no publication
"""
uuid_ = str(uuid.uuid4().hex)
return deployment_fixed.XenFixedUserService(
environment=environment.Environment.private_environment(uuid_),
service=service or create_service_fixed(),
publication=None,
uuid=uuid_,
)
def create_userservice_linked(
service: typing.Optional[service.XenLinkedService] = None,
publication: typing.Optional['publication.XenPublication'] = None,
) -> deployment.XenLinkedUserService:
"""
Create a linked user service
"""
uuid_ = str(uuid.uuid4())
return deployment.XenLinkedUserService(
environment=environment.Environment.private_environment(uuid_),
service=service or create_service_linked(),
publication=publication or create_publication(),
uuid=uuid_,
)

View File

@ -33,47 +33,28 @@ Author: Adolfo Gómez, dkmaster at dkmon dot com
import pickle
import typing
from uds.core.environment import Environment
from uds.core import types
# We use storage, so we need transactional tests
from tests.utils.test import UDSTransactionTestCase
from uds.core.environment import Environment
from . import fixtures
from uds.services.Xen.deployment import Operation as Operation, XenLinkedDeployment as Deployment
from uds.services.Xen.deployment import OldOperation, XenLinkedUserService as Deployment
# if not data.startswith(b'v'):
# return super().unmarshal(data)
# vals = data.split(b'\1')
# logger.debug('Values: %s', vals)
# if vals[0] == b'v1':
# self._name = vals[1].decode('utf8')
# self._ip = vals[2].decode('utf8')
# self._mac = vals[3].decode('utf8')
# self._vmid = vals[4].decode('utf8')
# self._reason = vals[5].decode('utf8')
# self._queue = pickle.loads(vals[6]) # nosec: not insecure, we are loading our own data
# self._task = vals[7].decode('utf8')
# self.flag_for_upgrade() # Force upgrade
EXPECTED_FIELDS: typing.Final[set[str]] = {
'_name',
'_ip',
'_mac',
'_vmid',
'_reason',
'_queue',
'_task',
}
TEST_QUEUE: typing.Final[list[Operation]] = [
Operation.CREATE,
Operation.REMOVE,
Operation.RETRY,
TEST_QUEUE_OLD: typing.Final[list[OldOperation]] = [
OldOperation.CREATE,
OldOperation.REMOVE,
OldOperation.RETRY,
]
TEST_QUEUE: typing.Final[list[types.services.Operation]] = [i.as_operation() for i in TEST_QUEUE_OLD]
SERIALIZED_DEPLOYMENT_DATA: typing.Final[typing.Mapping[str, bytes]] = {
'v1': b'v1\x01name\x01ip\x01mac\x01vmid\x01reason\x01' + pickle.dumps(TEST_QUEUE, protocol=0) + b'\x01task',
'v1': b'v1\x01name\x01ip\x01mac\x01vmid\x01reason\x01'
+ pickle.dumps(TEST_QUEUE_OLD, protocol=0)
+ b'\x01task',
}
LAST_VERSION: typing.Final[str] = sorted(SERIALIZED_DEPLOYMENT_DATA.keys(), reverse=True)[0]
@ -122,10 +103,10 @@ class XenDeploymentSerializationTest(UDSTransactionTestCase):
# queue is kept on "storage", so we need always same environment
environment = Environment.testing_environment()
# Store queue
environment.storage.save_pickled('queue', TEST_QUEUE)
environment.storage.save_pickled('queue', TEST_QUEUE_OLD)
def _create_instance(unmarshal_data: 'bytes|None' = None) -> Deployment:
instance = Deployment(environment=environment, service=None) # type: ignore # service is not used
instance = fixtures.create_userservice_linked()
if unmarshal_data:
instance.unmarshal(unmarshal_data)
return instance
@ -134,26 +115,26 @@ class XenDeploymentSerializationTest(UDSTransactionTestCase):
self.assertEqual(instance._queue, TEST_QUEUE)
instance._queue = [
Operation.CREATE,
Operation.FINISH,
types.services.Operation.CREATE,
types.services.Operation.FINISH,
]
marshaled_data = instance.marshal()
instance = _create_instance(marshaled_data)
self.assertEqual(
instance._queue,
[Operation.CREATE, Operation.FINISH],
[types.services.Operation.CREATE, types.services.Operation.FINISH],
)
# Append something remarshall and check
instance._queue.insert(0, Operation.RETRY)
instance._queue.insert(0, types.services.Operation.RETRY)
marshaled_data = instance.marshal()
instance = _create_instance(marshaled_data)
self.assertEqual(
instance._queue,
[
Operation.RETRY,
Operation.CREATE,
Operation.FINISH,
types.services.Operation.RETRY,
types.services.Operation.CREATE,
types.services.Operation.FINISH,
],
)
# Remove something remarshall and check
@ -162,13 +143,5 @@ class XenDeploymentSerializationTest(UDSTransactionTestCase):
instance = _create_instance(marshaled_data)
self.assertEqual(
instance._queue,
[Operation.CREATE, Operation.FINISH],
[types.services.Operation.CREATE, types.services.Operation.FINISH],
)
def test_autoserialization_fields(self) -> None:
# This test is designed to ensure that all fields are autoserializable
# If some field is added or removed, this tests will warn us about it to fix the rest of the related tests
with Environment.temporary_environment() as env:
instance = Deployment(environment=env, service=None) # type: ignore # service is not used
self.assertSetEqual(set(f[0] for f in instance._autoserializable_fields()), EXPECTED_FIELDS)

View File

@ -30,62 +30,32 @@
"""
Author: Adolfo Gómez, dkmaster at dkmon dot com
"""
import pickle
import typing
from tests.utils.test import UDSTestCase
from uds.core import types
from uds.core.environment import Environment
from uds.core.util import autoserializable
from uds.services.Xen.publication import XenPublication as Publication
from uds.services.Xen.publication import XenPublication as XenPublication
# if not data.startswith(b'v'):
# return super().unmarshal(data)
# # logger.debug('Data: {0}'.format(data))
# vals = data.decode('utf8').split('\t')
# if vals[0] == 'v1':
# (
# self._name,
# self._reason,
# destroy_after,
# self._template_id,
# self._state,
# self._task,
# ) = vals[1:]
# else:
# raise ValueError('Invalid data format')
# self._destroy_after = destroy_after == 't'
# self.flag_for_upgrade() # Force upgrade asap
EXPECTED_FIELDS: typing.Final[set[str]] = {
'_name',
'_reason',
'_destroy_after',
'_template_id',
'_state',
'_task',
}
SERIALIZED_PUBLICATION_DATA: typing.Final[bytes] = b'v1\tname\treason\tt\ttemplate_id\tstate\ttask'
SERIALIZED_PUBLICATION_DATA: typing.Final[bytes] = b'v1\tname\treason\tt\ttemplate_id\tok\ttask'
class XenPublicationSerializationTest(UDSTestCase):
def check(self, instance: Publication) -> None:
def check(self, instance: XenPublication) -> None:
self.assertEqual(instance._name, 'name')
self.assertEqual(instance._reason, 'reason')
self.assertTrue(instance._destroy_after)
self.assertEqual(instance._template_id, 'template_id')
self.assertEqual(instance._state, 'state')
self.assertTrue(instance._is_flagged_for_destroy)
self.assertEqual(instance._vmid, 'template_id')
self.assertEqual(instance._task, 'task')
self.assertEqual(instance._queue, [types.services.Operation.CREATE, types.services.Operation.FINISH])
def test_marshaling(self) -> None:
environment = Environment.testing_environment()
instance = Publication(environment=environment, service=None) # type: ignore
instance = XenPublication(environment=environment, service=None) # type: ignore
instance.unmarshal(SERIALIZED_PUBLICATION_DATA)
self.check(instance)
# Ensure remarshalled flag is set
@ -97,16 +67,9 @@ class XenPublicationSerializationTest(UDSTestCase):
# Ensure fields has been marshalled using new format
self.assertFalse(marshaled_data.startswith(b'\1'))
# Reunmarshall again and check that remarshalled flag is not set
instance = Publication(environment=environment, service=None) # type: ignore
instance = XenPublication(environment=environment, service=None) # type: ignore
instance.unmarshal(marshaled_data)
self.assertFalse(instance.needs_upgrade())
# Check that data is correct
self.check(instance)
def test_autoserialization_fields(self) -> None:
# This test is designed to ensure that all fields are autoserializable
# If some field is added or removed, this tests will warn us about it to fix the rest of the related tests
with Environment.temporary_environment() as env:
instance = Publication(environment=env, service=None) # type: ignore
self.assertSetEqual(set(f[0] for f in instance._autoserializable_fields()), EXPECTED_FIELDS)

View File

@ -30,6 +30,7 @@ Author: Adolfo Gómez, dkmaster at dkmon dot com
"""
import collections.abc
import typing
import functools
import dataclasses
from unittest import mock
@ -37,6 +38,8 @@ from unittest import mock
class AutoSpecMethodInfo:
name: str|typing.Callable[..., typing.Any]
returns: typing.Any = None # Can be a callable or a value
partial_args: typing.Tuple[typing.Any, ...] = ()
partial_kwargs: dict[str, typing.Any] = dataclasses.field(default_factory=dict)
def autospec(cls: type, metods_info: collections.abc.Iterable[AutoSpecMethodInfo], **kwargs: typing.Any) -> mock.Mock:
@ -55,7 +58,8 @@ def autospec(cls: type, metods_info: collections.abc.Iterable[AutoSpecMethodInfo
name = method_info.name if isinstance(method_info.name, str) else method_info.name.__name__
mck = getattr(obj, name)
if callable(method_info.returns):
mck.side_effect = method_info.returns
mck.side_effect = functools.partial(method_info.returns, *method_info.partial_args, **method_info.partial_kwargs)
#mck.side_effect = method_info.returns
else:
mck.return_value = method_info.returns

View File

@ -90,7 +90,7 @@ class OldOperation(enum.IntEnum):
}.get(self, types.services.Operation.UNKNOWN)
class XenLinkedDeployment(DynamicUserService, autoserializable.AutoSerializable):
class XenLinkedUserService(DynamicUserService, autoserializable.AutoSerializable):
_task = autoserializable.StringField(default='')
def initialize(self) -> None:
@ -118,38 +118,17 @@ class XenLinkedDeployment(DynamicUserService, autoserializable.AutoSerializable)
self._vmid = vals[4].decode('utf8')
self._reason = vals[5].decode('utf8')
self._queue = [
i.as_operation() for i in pickle.loads(vals[6])
i.as_operation() for i in typing.cast(list[OldOperation], pickle.loads(vals[6]))
] # nosec: not insecure, we are loading our own data
self._task = vals[7].decode('utf8')
self.mark_for_upgrade() # Force upgrade
def _init_queue_for_deployment(self, cache_l2: bool = False) -> None:
if cache_l2 is False:
self._queue = [
OldOperation.CREATE,
OldOperation.CONFIGURE,
OldOperation.PROVISION,
OldOperation.START,
OldOperation.FINISH,
]
else:
self._queue = [
OldOperation.CREATE,
OldOperation.CONFIGURE,
OldOperation.PROVISION,
OldOperation.START,
OldOperation.WAIT,
OldOperation.WAIT_SUSPEND,
OldOperation.SUSPEND,
OldOperation.FINISH,
]
def op_create(self) -> None:
"""
Deploys a machine from template for user/cache
"""
template_id = self.publication().getTemplateId()
template_id = self.publication().get_template_id()
name = self.get_name()
if name == consts.NO_MORE_NAMES:
raise Exception(

View File

@ -34,11 +34,9 @@ import logging
import typing
from uds.core import types
from uds.core.services.generics.fixed.userservice import FixedUserService, Operation
from uds.core.services.generics.fixed.userservice import FixedUserService
from uds.core.util import autoserializable
from .xen import types as xen_types, exceptions as xen_exceptions
# Not imported at runtime, just for type checking
if typing.TYPE_CHECKING:
from . import service_fixed
@ -65,25 +63,6 @@ class XenFixedUserService(FixedUserService, autoserializable.AutoSerializable):
def service(self) -> 'service_fixed.XenFixedService':
return typing.cast('service_fixed.XenFixedService', super().service())
def set_ready(self) -> types.states.TaskState:
if self.cache.get('ready') == '1':
return types.states.TaskState.FINISHED
try:
state = self.service().get_machine_power_state(self._vmid)
if state != xen_types.PowerState.RUNNING:
self._queue = [Operation.START, Operation.FINISH]
return self._execute_queue()
self.cache.put('ready', '1', 30)
except Exception as e:
# On case of exception, log an an error and return as if the operation was executed
self.do_log(types.log.LogLevel.ERROR, 'Error setting machine state: {}'.format(e))
# return self.__error('Machine is not available anymore')
return types.states.TaskState.FINISHED
def reset(self) -> types.states.TaskState:
if self._vmid:
self.service().reset_machine(self._vmid) # Reset in sync
@ -94,41 +73,24 @@ class XenFixedUserService(FixedUserService, autoserializable.AutoSerializable):
return types.states.TaskState.FINISHED
def op_start(self) -> None:
try:
state = self.service().get_machine_power_state(self._vmid)
except Exception as e:
raise Exception('Machine not found on start machine') from e
if state != xen_types.PowerState.RUNNING:
self._task = self.service().start_vm(self._vmid) or ''
self._task = self.service().start_vm(self._vmid)
def op_stop(self) -> None:
try:
state = self.service().get_machine_power_state(self._vmid)
except Exception as e:
raise Exception('Machine not found on stop machine') from e
if state == xen_types.PowerState.RUNNING:
logger.debug('Stopping machine %s', self._vmid)
self._task = self.service().stop_vm(self._vmid) or ''
self._task = self.service().stop_vm(self._vmid)
# Check methods
def _check_task_finished(self) -> types.states.TaskState:
if self._task == '':
return types.states.TaskState.FINISHED
try:
finished, _per = self.service().check_task_finished(self._task)
except xen_exceptions.XenFailure:
return types.states.TaskState.RUNNING # Try again later
except Exception as e: # Failed for some other reason
if isinstance(e.args[0], dict) and 'error_connection' in e.args[0]:
return types.states.TaskState.RUNNING # Try again later
raise e
if finished:
return types.states.TaskState.FINISHED
with self.service().provider().get_connection() as api:
task_info = api.get_task_info(self._task)
if task_info.is_failure():
raise Exception(task_info.result) # Will set error state
if task_info.is_success():
return types.states.TaskState.FINISHED
return types.states.TaskState.RUNNING
# Check methods

View File

@ -47,14 +47,18 @@ def get_machines(parameters: typing.Any) -> types.ui.CallbackResultType:
XenProvider, models.Provider.objects.get(uuid=parameters['prov_uuid']).get_instance()
)
try:
machines = [m for m in provider.get_machines_from_folder(parameters['folder'], retrieve_names=True) if not m.get('name', '').startswith('UDS')]
except Exception:
return []
with provider.get_connection() as api:
try:
vms = sorted(
[m for m in api.list_vms_from_folder(parameters['folder']) if not m.name.startswith('UDS')],
key=lambda x: x.name,
)
except Exception:
return []
return [
{
'name': 'machines',
'choices': [gui.choice_item(machine['id'], machine['name']) for machine in machines],
'choices': [gui.choice_item(vm.opaque_ref, vm.name) for vm in vms],
}
]

View File

@ -137,19 +137,19 @@ class XenProvider(ServiceProvider): # pylint: disable=too-many-public-methods
old_field_name='hostBackup',
)
_cached_api: typing.Optional[client.XenServer]
_cached_api: typing.Optional[client.XenClient]
_use_count: int = 0
# XenServer engine, right now, only permits a connection to one server and only one per instance
# If we want to connect to more than one server, we need keep locked access to api, change api server, etc..
# We have implemented an "exclusive access" client that will only connect to one server at a time (using locks)
# and this way all will be fine
def _api(self) -> client.XenServer:
def _api(self) -> client.XenClient:
"""
Returns the connection API object for XenServer (using XenServersdk)
"""
if not self._cached_api:
self._cached_api = client.XenServer(
self._cached_api = client.XenClient(
self.host.value,
self.host_backup.value,
443,
@ -162,7 +162,7 @@ class XenProvider(ServiceProvider): # pylint: disable=too-many-public-methods
return self._cached_api
@contextlib.contextmanager
def get_connection(self) -> typing.Iterator[client.XenServer]:
def get_connection(self) -> typing.Iterator[client.XenClient]:
"""
Context manager for XenServer API
"""

View File

@ -77,87 +77,44 @@ class XenPublication(DynamicPublication, autoserializable.AutoSerializable):
raise ValueError('Invalid data format')
self._is_flagged_for_destroy = destroy_after == 't'
if state == 'finished':
self._set_queue([types.services.Operation.FINISH])
elif state == 'error':
self._set_queue([types.services.Operation.ERROR])
else: # Running
self._set_queue([types.services.Operation.CREATE, types.services.Operation.FINISH])
self._queue
self.mark_for_upgrade() # Force upgrade asap
def publish(self) -> types.states.TaskState:
"""
Realizes the publication of the service
"""
def op_create(self) -> None:
self._name = self.service().sanitized_name(
'UDS Pub ' + self.servicepool_name() + "-" + str(self.revision())
)
comments = _('UDS pub for {0} at {1}').format(
self.servicepool_name(), str(datetime.now()).split('.')[0]
)
self._reason = '' # No error, no reason for it
self._destroy_after = False
self._state = 'ok'
try:
self._task = self.service().start_deploy_of_template(self._name, comments)
except Exception as e:
self._state = 'error'
self._reason = str(e)
return types.states.TaskState.ERROR
return types.states.TaskState.RUNNING
def check_state(self) -> types.states.TaskState:
self._task = self.service().start_deploy_of_template(self._name, comments)
def op_create_checker(self) -> types.states.TaskState:
"""
Checks state of publication creation
"""
if self._state == 'finished':
return types.states.TaskState.FINISHED
if self._state == 'error':
return types.states.TaskState.ERROR
try:
state, result = self.service().check_task_finished(self._task)
if state: # Finished
self._state = 'finished'
self._vmid = result
if self._destroy_after:
self._destroy_after = False
return self.destroy()
with self.service().provider().get_connection() as api:
task_info = api.get_task_info(self._task)
if task_info.is_done():
self._vmid = task_info.result
self.service().convert_to_template(self._vmid)
return types.states.TaskState.FINISHED
except Exception as e:
self._state = 'error'
self._reason = str(e)
return types.states.TaskState.ERROR
return types.states.TaskState.RUNNING
def error_reason(self) -> str:
return self._reason
def destroy(self) -> types.states.TaskState:
# We do not do anything else to destroy this instance of publication
if self._state == 'ok':
self._destroy_after = True
return types.states.TaskState.RUNNING
try:
self.service().remove_template(self._vmid)
except Exception as e:
self._state = 'error'
self._reason = str(e)
return types.states.TaskState.ERROR
return types.states.TaskState.FINISHED
def cancel(self) -> types.states.TaskState:
return self.destroy()
# Here ends the publication needed methods.
# Methods provided below are specific for this publication type
# and will be used by user deployments that uses this kind of publication
def getTemplateId(self) -> str:
def get_template_id(self) -> str:
"""
Returns the template id associated with the publication
"""

View File

@ -39,7 +39,7 @@ from uds.core.util import validators
from uds.core.ui import gui
from .publication import XenPublication
from .deployment import XenLinkedDeployment
from .deployment import XenLinkedUserService
# Not imported at runtime, just for type checking
if typing.TYPE_CHECKING:
@ -93,7 +93,7 @@ class XenLinkedService(DynamicService): # pylint: disable=too-many-public-metho
# : In our case, we do no need a publication, so this is None
publication_type = XenPublication
# : Types of deploys (services in cache and/or assigned to users)
user_service_type = XenLinkedDeployment
user_service_type = XenLinkedUserService
services_type_provided = types.services.ServiceType.VDI

View File

@ -54,13 +54,13 @@ logger = logging.getLogger(__name__)
class XenFixedService(FixedService): # pylint: disable=too-many-public-methods
"""
Represents a Proxmox service based on fixed machines.
Represents a Xen service based on fixed machines.
This service requires the qemu agent to be installed on the machines.
"""
type_name = _('Proxmox Fixed Machines')
type_type = 'ProxmoxFixedService'
type_description = _('Proxmox Services based on fixed machines. Needs qemu agent installed on machines.')
type_name = _('Xen Fixed Machines')
type_type = 'XenFixedService'
type_description = _('Xen Services based on fixed machines. Needs xen agent installed on machines.')
icon_file = 'service.png'
can_reset = True
@ -88,7 +88,7 @@ class XenFixedService(FixedService): # pylint: disable=too-many-public-methods
},
tooltip=_('Folder containing base machines'),
required=True,
tab=_('Machines'),
tab=types.ui.Tab.MACHINE,
old_field_name='resourcePool',
)
machines = FixedService.machines
@ -110,7 +110,7 @@ class XenFixedService(FixedService): # pylint: disable=too-many-public-methods
def provider(self) -> 'XenProvider':
return typing.cast('XenProvider', super().provider())
def start_vm(self, vmid: str) -> typing.Optional[str]:
def start_vm(self, vmid: str) -> str:
"""
Tries to start a machine. No check is done, it is simply requested to Xen.
@ -122,9 +122,11 @@ class XenFixedService(FixedService): # pylint: disable=too-many-public-methods
Returns:
"""
with self.provider().get_connection() as api:
api.start_vm(vmid)
if api.get_vm_info(vmid).power_state.is_running():
return '' # Already running
return api.start_vm(vmid)
def stop_vm(self, vmid: str) -> typing.Optional[str]:
def stop_vm(self, vmid: str) -> str:
"""
Tries to stop a machine. No check is done, it is simply requested to Xen
@ -134,9 +136,13 @@ class XenFixedService(FixedService): # pylint: disable=too-many-public-methods
Returns:
"""
with self.provider().get_connection() as api:
return api.stop_vm(vmid)
if api.get_vm_info(vmid).power_state.is_running():
return api.stop_vm(vmid)
return '' # Already stopped
def reset_machine(self, vmid: str) -> typing.Optional[str]:
def reset_machine(self, vmid: str) -> str:
"""
Tries to stop a machine. No check is done, it is simply requested to Xen
@ -146,12 +152,17 @@ class XenFixedService(FixedService): # pylint: disable=too-many-public-methods
Returns:
"""
with self.provider().get_connection() as api:
return api.reset_vm(vmid)
if api.get_vm_info(vmid).power_state.is_running():
return api.reset_vm(vmid)
return '' # Already stopped, no reset needed
def shutdown_machine(self, vmid: str) -> typing.Optional[str]:
def shutdown_machine(self, vmid: str) -> str:
with self.provider().get_connection() as api:
return api.shutdown_vm(vmid)
return self.provider().shutdown_machine(vmid)
if api.get_vm_info(vmid).power_state.is_running():
return api.shutdown_vm(vmid)
return '' # Already stopped
@cached('reachable', consts.cache.SHORT_CACHE_TIMEOUT)
def is_avaliable(self) -> bool:

View File

@ -48,11 +48,11 @@ TAG_TEMPLATE = "uds-template"
TAG_MACHINE = "uds-machine"
def cache_key_helper(server_api: 'XenServer') -> str:
def cache_key_helper(server_api: 'XenClient') -> str:
return server_api._url # pyright: ignore[reportPrivateUsage]
class XenServer: # pylint: disable=too-many-public-methods
class XenClient: # pylint: disable=too-many-public-methods
_originalHost: str
_host: str
_host_backup: str
@ -216,12 +216,6 @@ class XenServer: # pylint: disable=too-many-public-methods
def test(self) -> None:
self.login(False)
def get_host(self) -> str:
return self._host
def set_host(self, host: str) -> None:
self._host = host
def get_task_info(self, task_opaque_ref: str) -> xen_types.TaskInfo:
try:
task_info = xen_types.TaskInfo.from_dict(self.task.get_record(task_opaque_ref), task_opaque_ref)
@ -229,7 +223,7 @@ class XenServer: # pylint: disable=too-many-public-methods
if e.details[0] == 'HANDLE_INVALID':
return xen_types.TaskInfo.unknown_task(task_opaque_ref)
raise exceptions.XenFailure(e.details)
return task_info
@cached(prefix='xen_srs', timeout=consts.cache.DEFAULT_CACHE_TIMEOUT, key_helper=cache_key_helper)
@ -290,7 +284,7 @@ class XenServer: # pylint: disable=too-many-public-methods
except XenAPI.Failure as e:
raise exceptions.XenFailure(e.details)
def start_vm(self, vm_opaque_ref: str, as_async: bool = True) -> typing.Optional[str]:
def _start_vm(self, vm_opaque_ref: str, as_async: bool = True) -> typing.Optional[str]:
vminfo = self.get_vm_info(vm_opaque_ref)
if vminfo.power_state.is_running():
return None
@ -300,32 +294,32 @@ class XenServer: # pylint: disable=too-many-public-methods
return (self.Async if as_async else self).VM.start(vm_opaque_ref, False, False)
def stop_vm(self, vm_opaque_ref: str, as_async: bool = True) -> typing.Optional[str]:
def _stop_vm(self, vm_opaque_ref: str, as_async: bool = True) -> typing.Optional[str]:
vminfo = self.get_vm_info(vm_opaque_ref)
if vminfo.power_state.is_stopped():
return None
return (self.Async if as_async else self).VM.hard_shutdown(vm_opaque_ref)
def reset_vm(self, vm_opaque_ref: str, as_async: bool = True) -> typing.Optional[str]:
def _reset_vm(self, vm_opaque_ref: str, as_async: bool = True) -> typing.Optional[str]:
vminfo = self.get_vm_info(vm_opaque_ref)
if vminfo.power_state.is_stopped():
return None
return (self.Async if as_async else self).VM.hard_reboot(vm_opaque_ref)
def suspend_vm(self, vm_opaque_ref: str, as_async: bool = True) -> typing.Optional[str]:
def _suspend_vm(self, vm_opaque_ref: str, as_async: bool = True) -> typing.Optional[str]:
vminfo = self.get_vm_info(vm_opaque_ref)
if vminfo.power_state.is_stopped():
return None
if vminfo.supports_suspend() is False:
# Shutdown machine if it can't be suspended
return self.shutdown_vm(vm_opaque_ref, as_async)
return self._shutdown_vm(vm_opaque_ref, as_async)
return (self.Async if as_async else self).VM.suspend(vm_opaque_ref)
def resume_vm(self, vm_opaque_ref: str, as_async: bool = True) -> typing.Optional[str]:
def _resume_vm(self, vm_opaque_ref: str, as_async: bool = True) -> typing.Optional[str]:
vminfo = self.get_vm_info(vm_opaque_ref)
if vminfo.power_state.is_running():
return None
@ -335,12 +329,48 @@ class XenServer: # pylint: disable=too-many-public-methods
return (self.Async if as_async else self).VM.resume(vm_opaque_ref, False, False)
def shutdown_vm(self, vm_opaque_ref: str, as_async: bool = True) -> typing.Optional[str]:
def _shutdown_vm(self, vm_opaque_ref: str, as_async: bool = True) -> typing.Optional[str]:
vminfo = self.get_vm_info(vm_opaque_ref)
if vminfo.power_state.is_stopped():
return None
return (self.Async if as_async else self).VM.clean_shutdown(vm_opaque_ref)
def start_vm(self, vm_opaque_ref: str, as_async: bool = True) -> str:
return typing.cast(str, self._start_vm(vm_opaque_ref, as_async)) # We know it's not None on async
def start_vm_sync(self, vm_opaque_ref: str) -> None:
self._start_vm(vm_opaque_ref, False)
def stop_vm(self, vm_opaque_ref: str, as_async: bool = True) -> str:
return typing.cast(str, self._stop_vm(vm_opaque_ref, as_async))
def stop_vm_sync(self, vm_opaque_ref: str) -> None:
self._stop_vm(vm_opaque_ref, False)
def reset_vm(self, vm_opaque_ref: str, as_async: bool = True) -> str:
return typing.cast(str, self._reset_vm(vm_opaque_ref, as_async))
def reset_vm_sync(self, vm_opaque_ref: str) -> None:
self._reset_vm(vm_opaque_ref, False)
def suspend_vm(self, vm_opaque_ref: str, as_async: bool = True) -> str:
return typing.cast(str, self._suspend_vm(vm_opaque_ref, as_async))
def suspend_vm_sync(self, vm_opaque_ref: str) -> None:
self._suspend_vm(vm_opaque_ref, False)
def resume_vm(self, vm_opaque_ref: str, as_async: bool = True) -> str:
return typing.cast(str, self._resume_vm(vm_opaque_ref, as_async))
def resume_vm_sync(self, vm_opaque_ref: str) -> None:
self._resume_vm(vm_opaque_ref, False)
def shutdown_vm(self, vm_opaque_ref: str, as_async: bool = True) -> str:
return typing.cast(str, self._shutdown_vm(vm_opaque_ref, as_async))
def shutdown_vm_sync(self, vm_opaque_ref: str) -> None:
self._shutdown_vm(vm_opaque_ref, False)
def clone_vm(self, vm_opaque_ref: str, target_name: str, target_sr: typing.Optional[str] = None) -> str:
"""
If target_sr is NONE:
@ -510,9 +540,7 @@ class XenServer: # pylint: disable=too-many-public-methods
raise exceptions.XenFailure(e.details)
@cached(prefix='xen_snapshots', timeout=consts.cache.SHORT_CACHE_TIMEOUT, key_helper=cache_key_helper)
def list_snapshots(
self, vmid: str, full_info: bool = True, **kwargs: typing.Any
) -> list[xen_types.VMInfo]:
def list_snapshots(self, vmid: str, full_info: bool = True, **kwargs: typing.Any) -> list[xen_types.VMInfo]:
"""Returns a list of snapshots for the specified VM, sorted by snapshot_time in descending order.
(That is, the most recent snapshot is first in the list.)
@ -525,15 +553,16 @@ class XenServer: # pylint: disable=too-many-public-methods
"""
try:
snapshots = self.VM.get_snapshots(vmid)
if not full_info:
return [xen_types.VMInfo.empty(snapshot) for snapshot in snapshots]
# Return full info, thatis, name, id and snapshot_time
return sorted([
self.get_vm_info(snapshot)
for snapshot in snapshots
], key=lambda x: x.snapshot_time, reverse=True)
return sorted(
[self.get_vm_info(snapshot) for snapshot in snapshots],
key=lambda x: x.snapshot_time,
reverse=True,
)
except XenAPI.Failure as e:
raise exceptions.XenFailure(e.details)

View File

@ -38,6 +38,7 @@ from uds.core.services.generics import exceptions
logger = logging.getLogger(__name__)
class XenFault(exceptions.Error):
pass
@ -90,3 +91,7 @@ class XenException(XenFault):
logger.debug('Exception create: %s', message)
class XenNotFoundError(XenException, exceptions.NotFoundError):
def __init__(self, message: typing.Any):
XenException.__init__(self, message)
logger.debug('Not found exception create: %s', message)