1
0
mirror of https://github.com/dkmstr/openuds.git synced 2024-12-22 13:34:04 +03:00

Adding fixtures for xen client api

This commit is contained in:
Adolfo Gómez García 2024-05-26 20:10:27 +02:00
parent 7281fa3494
commit f2c7afbd44
No known key found for this signature in database
GPG Key ID: DD1ABF20724CDA23
5 changed files with 364 additions and 26 deletions

View File

@ -55,6 +55,9 @@ from uds.services.Xen import (
from uds.services.Xen.xen import types as xen_types, exceptions as xen_exceptions, client
DEF_POOL_NAME: typing.Final[str] = 'TEST_pool_NAME'
DEF_CHANGE_STATE_OPAQUE_REF: typing.Final[str] = 'OpaqueRef:12345678-cdef-abcd-1234-1234567890ab'
DEF_TASK_INFO = xen_types.TaskInfo(
opaque_ref='OpaqueRef:12345678-1234-1234-1234-1234567890ab',
uuid='12345678-1234-1234-1234-1234567890ab',
@ -91,20 +94,75 @@ DEF_SRS_INFO = [
for i in range(8)
]
SRS_INFO = DEF_SRS_INFO
DEF_NETWORKS_INFO = [
xen_types.NetworkInfo(
opaque_ref=f'OpaqueRef:12345678-1234-1234-1234-1234567890{i:02x}',
uuid=f'12345678-1234-1234-1234-1234567890{i:02x}',
name=f'test_network{i:02x}',
description=f'Test network description {i:02x}',
managed=True,
VIFs=[],
PIFs=[],
is_guest_installer_network=False,
is_host_internal_management_network=False,
ip_begin=f'10.0.0.{i}',
ip_end=f'10.0.0.{i + 1}',
netmask='255.255.0.0',
)
for i in range(8)
]
DEF_VMS_INFO = [
xen_types.VMInfo(
opaque_ref=f'OpaqueRef:12345678-1234-1234-1234-1234567890{i:02x}',
uuid=f'12345678-1234-1234-1234-1234567890{i:02x}',
name=f'test_vm{i:02x}',
description=f'Test VM description {i:02x}',
power_state=xen_types.PowerState.RUNNING,
is_control_domain=False,
is_a_template=False,
snapshot_time=datetime.datetime(2024, 1, 1, 0, 0, 0),
# For testing, snapshot refers to itself 3 times, just for testing...
snapshots=[f'OpaqueRef:12345678-1234-1234-1234-1234567890{i:02x}'] * 3,
allowed_operations=[
xen_types.VMOperations.START,
xen_types.VMOperations.CLONE,
xen_types.VMOperations.COPY,
xen_types.VMOperations.SNAPSHOT,
],
folder=f'/test_folder_{i//4}',
)
for i in range(16)
]
POOL_NAME = DEF_POOL_NAME
CHANGE_STATE_OPAQUE_REF = DEF_CHANGE_STATE_OPAQUE_REF
TASK_INFO = DEF_TASK_INFO
SRS_INFO = DEF_SRS_INFO.copy()
NETWORKS_INFO = DEF_NETWORKS_INFO.copy()
VMS_INFO = DEF_VMS_INFO.copy()
def initialize_defaults() -> None:
def reset_data() -> None:
"""
Initialize default values for the module variables
"""
global TASK_INFO, SRS_INFO
# Import non local variables
global TASK_INFO, POOL_NAME, CHANGE_STATE_OPAQUE_REF
TASK_INFO = DEF_TASK_INFO
SRS_INFO = DEF_SRS_INFO
POOL_NAME = DEF_POOL_NAME
CHANGE_STATE_OPAQUE_REF = DEF_CHANGE_STATE_OPAQUE_REF
SRS_INFO[:] = DEF_SRS_INFO
NETWORKS_INFO[:] = DEF_NETWORKS_INFO
VMS_INFO[:] = DEF_VMS_INFO
T = typing.TypeVar('T')
def random_from_list(lst: list[T], *args: typing.Any, **kwargs: typing.Any) -> T:
"""
Returns a random VM
@ -144,7 +202,7 @@ CLIENT_METHODS_INFO: typing.Final[list[AutoSpecMethodInfo]] = [
),
AutoSpecMethodInfo(
client.XenClient.get_pool_name,
returns='TEST_pool_NAME',
returns=POOL_NAME,
),
# Default login and logout, skip them
AutoSpecMethodInfo(
@ -165,16 +223,86 @@ CLIENT_METHODS_INFO: typing.Final[list[AutoSpecMethodInfo]] = [
returns=search_by_attr,
partial_args=(SRS_INFO, 'opaque_ref'),
),
AutoSpecMethodInfo(
client.XenClient.list_networks,
returns=NETWORKS_INFO,
),
AutoSpecMethodInfo(
client.XenClient.get_network_info,
returns=search_by_attr,
partial_args=(NETWORKS_INFO, 'opaque_ref'),
),
AutoSpecMethodInfo(
client.XenClient.list_vms,
returns=VMS_INFO,
),
AutoSpecMethodInfo(
client.XenClient.get_vm_info,
returns=search_by_attr,
partial_args=(VMS_INFO, 'opaque_ref'),
),
AutoSpecMethodInfo(
client.XenClient.start_vm,
returns=CHANGE_STATE_OPAQUE_REF,
),
AutoSpecMethodInfo(
client.XenClient.start_vm_sync,
returns=None,
),
AutoSpecMethodInfo(
client.XenClient.stop_vm,
returns=CHANGE_STATE_OPAQUE_REF,
),
AutoSpecMethodInfo(
client.XenClient.stop_vm_sync,
returns=None,
),
AutoSpecMethodInfo(
client.XenClient.reset_vm,
returns=CHANGE_STATE_OPAQUE_REF,
),
AutoSpecMethodInfo(
client.XenClient.reset_vm_sync,
returns=None,
),
AutoSpecMethodInfo(
client.XenClient.suspend_vm,
returns=CHANGE_STATE_OPAQUE_REF,
),
AutoSpecMethodInfo(
client.XenClient.suspend_vm_sync,
returns=None,
),
AutoSpecMethodInfo(
client.XenClient.resume_vm,
returns=CHANGE_STATE_OPAQUE_REF,
),
AutoSpecMethodInfo(
client.XenClient.resume_vm_sync,
returns=None,
),
AutoSpecMethodInfo(
client.XenClient.shutdown_vm,
returns=CHANGE_STATE_OPAQUE_REF,
),
AutoSpecMethodInfo(
client.XenClient.shutdown_vm_sync,
returns=None,
),
]
PROVIDER_VALUES_DICT: typing.Final[gui.ValuesDictType] = {
'host': 'test.example.com',
'port': 443,
'username': 'root',
'password': 'some_test_password',
'concurrent_creation_limit': 18,
'concurrent_removal_limit': 7,
'macs_range': '02:99:00:00:00:00-02:AA:00:FF:FF:FF',
'verify_ssl': True,
'timeout': 30,
'host_backup': 'test_backup.example.com',
}
@ -220,8 +348,7 @@ def patched_provider(
) -> typing.Generator[provider.XenProvider, None, None]:
client = create_client_mock()
provider = create_provider(**kwargs)
with mock.patch.object(provider, '_api') as api:
api.return_value = client
provider._cached_api = client
yield provider

View File

@ -0,0 +1,179 @@
# -*- coding: utf-8 -*-
#
# Copyright (c) 2024 Virtual Cable S.L.U.
# All rights reserved.
#
# Redistribution and use in source and binary forms, with or without modification,
# are permitted provided that the following conditions are met:
#
# * Redistributions of source code must retain the above copyright notice,
# this list of conditions and the following disclaimer.
# * Redistributions in binary form must reproduce the above copyright notice,
# this list of conditions and the following disclaimer in the documentation
# and/or other materials provided with the distribution.
# * Neither the name of Virtual Cable S.L.U. nor the names of its contributors
# may be used to endorse or promote products derived from this software
# without specific prior written permission.
#
# THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS IS"
# AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE
# IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE ARE
# DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT HOLDER OR CONTRIBUTORS BE LIABLE
# FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR CONSEQUENTIAL
# DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR
# SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER
# CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY,
# OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE
# OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
"""
Author: Adolfo Gómez, dkmaster at dkmon dot com
"""
import random
import typing
from unittest import mock
from uds.core import types, ui, environment
from uds.services.Xen.provider import XenProvider
from . import fixtures
from ...utils.test import UDSTransactionTestCase
class TestXenProvider(UDSTransactionTestCase):
def test_provider_data(self) -> None:
"""
Test the provider
"""
provider = fixtures.create_provider() # Will not use client api, so no need to patch it
self.assertEqual(provider.host.as_str(), fixtures.PROVIDER_VALUES_DICT['host'])
self.assertEqual(provider.port.as_int(), fixtures.PROVIDER_VALUES_DICT['port'])
self.assertEqual(provider.username.as_str(), fixtures.PROVIDER_VALUES_DICT['username'])
self.assertEqual(provider.password.as_str(), fixtures.PROVIDER_VALUES_DICT['password'])
if not isinstance(provider.concurrent_creation_limit, ui.gui.NumericField):
self.fail('concurrent_creation_limit is not a NumericField')
self.assertEqual(
provider.concurrent_creation_limit.as_int(),
fixtures.PROVIDER_VALUES_DICT['concurrent_creation_limit'],
)
# concurrent_removal_limit
if not isinstance(provider.concurrent_removal_limit, ui.gui.NumericField):
self.fail('concurrent_creation_limit is not a NumericField')
self.assertEqual(
provider.concurrent_removal_limit.as_int(),
fixtures.PROVIDER_VALUES_DICT['concurrent_removal_limit'],
)
self.assertEqual(provider.timeout.as_int(), fixtures.PROVIDER_VALUES_DICT['timeout'])
self.assertEqual(provider.macs_range.as_str(), fixtures.PROVIDER_VALUES_DICT['macs_range'])
self.assertEqual(provider.get_macs_range(), fixtures.PROVIDER_VALUES_DICT['macs_range'])
self.assertEqual(provider.host_backup.as_str(), fixtures.PROVIDER_VALUES_DICT['host_backup'])
def test_provider_is_available(self) -> None:
"""
Test the provider is_available
Thi is "specieal" because it uses cache
"""
with fixtures.patched_provider() as provider:
api = typing.cast(mock.MagicMock, provider._api)
# Fist, true result
api.test.return_value = True
self.assertEqual(provider.is_available(), True)
api.test.assert_called_once_with()
api.test.reset_mock() # Reset counter
# Now, even if set test to false, should return true due to cache
# To fail, make test mock raise an exception
api.test.side_effect = Exception('Testing exception')
self.assertEqual(provider.is_available(), True)
api.test.assert_not_called()
# clear cache of method
provider.is_available.cache_clear() # type: ignore # clear_cache is added by decorator
self.assertEqual(provider.is_available(), False)
api.test.assert_called_once_with()
def test_provider_get_connection(self) -> None:
"""
Test the provider methods
"""
with fixtures.patched_provider() as provider:
api = typing.cast(mock.MagicMock, provider._api)
with provider.get_connection() as conn:
self.assertEqual(conn, api)
def test_provider_test(self) -> None:
"""
Test the provider
"""
with fixtures.patched_provider() as provider:
api = typing.cast(mock.MagicMock, provider._api)
for ret_val in [True, False]:
api.test.reset_mock()
# Mock test_connection to return ret_val
# Mock test_connection to return ret_val
# Note that we must patch the class method, not the instance method
# Because a new instance is created on test
with mock.patch(
'uds.services.Xen.provider.XenProvider.test_connection',
side_effect=Exception('Testing exception') if not ret_val else None,
):
result = XenProvider.test(
environment.Environment.temporary_environment(), fixtures.PROVIDER_VALUES_DICT
)
self.assertIsInstance(result, types.core.TestResult)
self.assertEqual(result.success, ret_val)
self.assertIsInstance(result.error, str)
# Now, ensure test_connection calls api.test
provider.test_connection()
# Ensure test is called
api.test.assert_called_once_with()
def test_api_methods(self) -> None:
"""
Test the provider
"""
fixtures.reset_data()
with fixtures.patched_provider() as provider:
api = provider._api # typing.cast(mock.MagicMock, provider._api)
self.assertEqual(api.has_pool(), True)
self.assertEqual(api.get_pool_name(), fixtures.POOL_NAME)
self.assertEqual(api.check_login(), True)
self.assertEqual(api.get_task_info('task_id'), fixtures.TASK_INFO)
self.assertEqual(api.list_srs(), fixtures.SRS_INFO)
SR = random.choice(fixtures.SRS_INFO)
self.assertEqual(api.get_sr_info(SR.opaque_ref), SR)
self.assertEqual(api.list_networks(), fixtures.NETWORKS_INFO)
NETWORK = random.choice(fixtures.NETWORKS_INFO)
self.assertEqual(api.get_network_info(NETWORK.opaque_ref), NETWORK)
self.assertEqual(api.list_vms(), fixtures.VMS_INFO)
VM = random.choice(fixtures.VMS_INFO)
self.assertEqual(api.get_vm_info(VM.opaque_ref), VM)
# Test state changer, start_vm, start_vm_sync, ...
self.assertEqual(api.start_vm(VM.opaque_ref), fixtures.CHANGE_STATE_OPAQUE_REF)
self.assertIsNone(api.start_vm_sync(VM.opaque_ref))
self.assertEqual(api.stop_vm(VM.opaque_ref), fixtures.CHANGE_STATE_OPAQUE_REF)
self.assertIsNone(api.stop_vm_sync(VM.opaque_ref))
self.assertEqual(api.suspend_vm(VM.opaque_ref), fixtures.CHANGE_STATE_OPAQUE_REF)
self.assertIsNone(api.suspend_vm_sync(VM.opaque_ref))
self.assertEqual(api.resume_vm(VM.opaque_ref), fixtures.CHANGE_STATE_OPAQUE_REF)
self.assertIsNone(api.resume_vm_sync(VM.opaque_ref))
self.assertEqual(api.reset_vm(VM.opaque_ref), fixtures.CHANGE_STATE_OPAQUE_REF)
self.assertIsNone(api.reset_vm_sync(VM.opaque_ref))
self.assertEqual(api.shutdown_vm(VM.opaque_ref), fixtures.CHANGE_STATE_OPAQUE_REF)
self.assertIsNone(api.shutdown_vm_sync(VM.opaque_ref))

View File

@ -106,6 +106,14 @@ class XenProvider(ServiceProvider): # pylint: disable=too-many-public-methods
tooltip=_('XenServer Server IP or Hostname'),
required=True,
)
port = gui.NumericField(
length=5,
label=_('Port'),
default=443,
order=2,
tooltip=_('XenServer Server Port'),
required=True,
)
username = gui.TextField(
length=32,
label=_('Username'),
@ -126,6 +134,7 @@ class XenProvider(ServiceProvider): # pylint: disable=too-many-public-methods
macs_range = fields.macs_range_field(default='02:46:00:00:00:00-02:46:00:FF:FF:FF')
verify_ssl = fields.verify_ssl_field(old_field_name='verifySSL')
timeout = fields.timeout_field()
host_backup = gui.TextField(
length=64,
@ -144,6 +153,7 @@ class XenProvider(ServiceProvider): # pylint: disable=too-many-public-methods
# If we want to connect to more than one server, we need keep locked access to api, change api server, etc..
# We have implemented an "exclusive access" client that will only connect to one server at a time (using locks)
# and this way all will be fine
@property
def _api(self) -> client.XenClient:
"""
Returns the connection API object for XenServer (using XenServersdk)
@ -152,11 +162,12 @@ class XenProvider(ServiceProvider): # pylint: disable=too-many-public-methods
self._cached_api = client.XenClient(
self.host.value,
self.host_backup.value,
443,
self.port.value,
self.username.value,
self.password.value,
True,
self.verify_ssl.as_bool(),
self.timeout.value,
)
return self._cached_api
@ -168,7 +179,7 @@ class XenProvider(ServiceProvider): # pylint: disable=too-many-public-methods
"""
self._use_count += 1
try:
yield self._api()
yield self._api
finally:
self._use_count -= 1
if self._use_count == 0 and self._cached_api:
@ -192,7 +203,7 @@ class XenProvider(ServiceProvider): # pylint: disable=too-many-public-methods
True if all went fine, false if id didn't
"""
self._api().test()
self._api.test()
def get_macs_range(self) -> str:
return self.macs_range.value

View File

@ -51,6 +51,27 @@ TAG_MACHINE = "uds-machine"
def cache_key_helper(server_api: 'XenClient') -> str:
return server_api._url # pyright: ignore[reportPrivateUsage]
class SafeTimeoutTransport(xmlrpc.client.SafeTransport):
_timeout: int = 0
def set_timeout(self, timeout: int) -> None:
self._timeout = timeout
def make_connection(self, host: typing.Any) -> typing.Any:
conn = super().make_connection(host)
conn.timeout = self._timeout
return conn
class TimeoutTransport(xmlrpc.client.Transport):
_timeout: int = 0
def set_timeout(self, timeout: int) -> None:
self._timeout = timeout
def make_connection(self, host: typing.Any) -> typing.Any:
conn = super().make_connection(host)
conn.timeout = self._timeout
return conn
class XenClient: # pylint: disable=too-many-public-methods
_originalHost: str
@ -59,6 +80,7 @@ class XenClient: # pylint: disable=too-many-public-methods
_port: str
_use_ssl: bool
_verify_ssl: bool
_timeout: int
_protocol: str
_url: str
_logged_in: bool
@ -77,12 +99,14 @@ class XenClient: # pylint: disable=too-many-public-methods
password: str,
ssl: bool = False,
verify_ssl: bool = False,
timeout: int = 10,
):
self._originalHost = self._host = host
self._host_backup = host_backup or ''
self._port = str(port)
self._use_ssl = bool(ssl)
self._verify_ssl = bool(verify_ssl)
self._timeout = timeout
self._protocol = 'http' + ('s' if self._use_ssl else '') + '://'
self._url = ''
self._logged_in = False
@ -165,7 +189,12 @@ class XenClient: # pylint: disable=too-many-public-methods
if self._use_ssl:
context = security.create_client_sslcontext(verify=self._verify_ssl)
transport = xmlrpc.client.SafeTransport(context=context)
transport = SafeTimeoutTransport(context=context)
transport.set_timeout(self._timeout)
logger.debug('Transport: %s', transport)
else:
transport = TimeoutTransport()
transport.set_timeout(self._timeout)
logger.debug('Transport: %s', transport)
self._session = XenAPI.Session(self._url, transport=transport)
@ -276,14 +305,6 @@ class XenClient: # pylint: disable=too-many-public-methods
except XenAPI.Failure as e:
raise exceptions.XenFailure(e.details)
@cached(prefix='xen_vm_f', timeout=consts.cache.SHORT_CACHE_TIMEOUT, key_helper=cache_key_helper)
def get_vm_folder(self, vmid: str, **kwargs: typing.Any) -> str:
try:
other_config = self.VM.get_other_config(vmid)
return other_config.get('folder', '')
except XenAPI.Failure as e:
raise exceptions.XenFailure(e.details)
def _start_vm(self, vm_opaque_ref: str, as_async: bool = True) -> typing.Optional[str]:
vminfo = self.get_vm_info(vm_opaque_ref)
if vminfo.power_state.is_running():

View File

@ -223,7 +223,7 @@ class VMOperations(enum.StrEnum):
create_vtpm Creating and adding a VTPM to this VM
"""
SNAPSHOOT = 'snapshot'
SNAPSHOT = 'snapshot'
CLONE = 'clone'
COPY = 'copy'
CREATE_TEMPLATE = 'create_template'
@ -259,9 +259,9 @@ class VMOperations(enum.StrEnum):
CHANGING_MEMORY_LIMITS = 'changing_memory_limits'
CHANGING_SHADOW_MEMORY = 'changing_shadow_memory'
CHANGING_SHADOW_MEMORY_LIVE = 'changing_shadow_memory_live'
CHANGING_VCPUS = 'changing_VCPUs'
CHANGING_VCPUS_LIVE = 'changing_VCPUs_live'
CHANGING_NVRAM = 'changing_NVRAM'
CHANGING_VCPUS = 'changing_vcpus'
CHANGING_VCPUS_LIVE = 'changing_vcpus_live'
CHANGING_NVRAM = 'changing_nvram'
ASSERT_OPERATION_VALID = 'assert_operation_valid'
DATA_SOURCE_OP = 'data_source_op'
UPDATE_ALLOWED_OPERATIONS = 'update_allowed_operations'