1
0
mirror of https://github.com/dkmstr/openuds.git synced 2025-01-08 21:18:00 +03:00

Finishing tests for Xen provider and related fixes

This commit is contained in:
Adolfo Gómez García 2024-05-27 19:55:38 +02:00
parent 7851470059
commit 9d0a6c5b6f
No known key found for this signature in database
GPG Key ID: DD1ABF20724CDA23
13 changed files with 755 additions and 79 deletions

View File

@ -172,3 +172,18 @@ class TestProxmoxFixedService(UDSTransactionTestCase):
api.list_snapshots.assert_called_with(int(vmid), None)
# restore snapshot
api.restore_snapshot.assert_called_with(int(vmid), None, fixtures.SNAPSHOTS_INFO[0].name)
def test_remove_and_free(self) -> None:
with fixtures.patched_provider() as provider:
service = fixtures.create_service_fixed(provider=provider)
with mock.patch.object(service, '_assigned_access') as assigned_access:
assigned_mock = mock.MagicMock()
assigned_access.return_value.__enter__.return_value = assigned_mock
service.remove_and_free('123')
assigned_mock.__contains__.assert_called_with('123')
assigned_mock.reset_mock()
assigned_mock.__contains__.return_value = True
service.remove_and_free('123')
assigned_mock.remove.assert_called_with('123')
assigned_mock.remove.assert_called_with('123')

View File

@ -31,6 +31,7 @@
Author: Adolfo Gómez, dkmaster at dkmon dot com
"""
import contextlib
import dataclasses
import random
import typing
import datetime
@ -84,6 +85,23 @@ DEF_SRS_INFO: typing.Final[list[xen_types.StorageInfo]] = [
for i in range(8)
]
LOW_SPACE_SR_INFO: typing.Final[xen_types.StorageInfo] = xen_types.StorageInfo(
opaque_ref='OpaqueRef:12345678-1234-1234-1234-1234567890ff',
uuid='12345678-1234-1234-1234-1234567890ff',
name='low_space_sr',
description='Low space SR description',
allowed_operations=[],
VDIs=[],
PBDs=[],
virtual_allocation=32 * 1024,
physical_utilisation=32 * 1024,
physical_size=32 * 1024,
type='',
content_type='',
shared=True,
)
DEF_NETWORKS_INFO: typing.Final[list[xen_types.NetworkInfo]] = [
xen_types.NetworkInfo(
opaque_ref=f'OpaqueRef:12345678-1234-1234-1234-1234567890{i:02x}',
@ -141,29 +159,29 @@ DEF_FOLDERS: list[str] = list(set(vm.folder for vm in DEF_VMS_INFO))
POOL_NAME = DEF_POOL_NAME
GENERAL_OPAQUE_REF = DEF_GENERAL_OPAQUE_REF
FOLDERS = DEF_FOLDERS
TASK_INFO = DEF_TASK_INFO
TASK_INFO = dataclasses.replace(DEF_TASK_INFO) # Copy the object
GENERAL_IP = DEF_GENERAL_IP
GENERAL_MAC = DEF_GENERAL_MAC
SRS_INFO = DEF_SRS_INFO.copy()
NETWORKS_INFO = DEF_NETWORKS_INFO.copy()
VMS_INFO = DEF_VMS_INFO.copy()
FOLDERS = DEF_FOLDERS.copy()
def reset_data() -> None:
def clean() -> None:
"""
Initialize default values for the module variables
"""
# Import non local variables
global TASK_INFO, POOL_NAME, GENERAL_OPAQUE_REF, GENERAL_IP, GENERAL_MAC
TASK_INFO = DEF_TASK_INFO
TASK_INFO = dataclasses.replace(DEF_TASK_INFO)
POOL_NAME = DEF_POOL_NAME
GENERAL_OPAQUE_REF = DEF_GENERAL_OPAQUE_REF
GENERAL_IP = DEF_GENERAL_IP
GENERAL_MAC = DEF_GENERAL_MAC
SRS_INFO[:] = DEF_SRS_INFO
NETWORKS_INFO[:] = DEF_NETWORKS_INFO
VMS_INFO[:] = DEF_VMS_INFO
@ -203,6 +221,7 @@ def search_by_attr(lst: list[T], attribute: str, value: typing.Any, **kwargs: ty
except ValueError:
raise xen_exceptions.XenNotFoundError(f'Item with {attribute}=="{value}" not found in list')
def set_vm_state(is_async: bool, state: xen_types.PowerState, vmid: str) -> 'str|None':
"""
Set the power state of a VM
@ -212,12 +231,22 @@ def set_vm_state(is_async: bool, state: xen_types.PowerState, vmid: str) -> 'str
vm.power_state = state
except ValueError:
raise xen_exceptions.XenNotFoundError(f'Item with opaque_ref=="{vmid}" not found in list')
if is_async:
return None
return GENERAL_OPAQUE_REF
def set_all_vm_state(state: xen_types.PowerState) -> None:
"""
Set the power state of all VMs
"""
for vm in VMS_INFO:
vm.power_state = state
def task_info(*args: typing.Any, **kwargs: typing.Any) -> xen_types.TaskInfo:
return TASK_INFO
# Methods that returns None or "internal" methods are not tested
CLIENT_METHODS_INFO: typing.Final[list[AutoSpecMethodInfo]] = [
AutoSpecMethodInfo(
@ -236,7 +265,7 @@ CLIENT_METHODS_INFO: typing.Final[list[AutoSpecMethodInfo]] = [
# Default test, skip it
AutoSpecMethodInfo(
client.XenClient.get_task_info,
returns=TASK_INFO,
returns=task_info,
),
AutoSpecMethodInfo(
client.XenClient.list_srs,
@ -337,7 +366,7 @@ CLIENT_METHODS_INFO: typing.Final[list[AutoSpecMethodInfo]] = [
),
AutoSpecMethodInfo(
client.XenClient.provision_vm,
returns=GENERAL_OPAQUE_REF,
returns=GENERAL_OPAQUE_REF,
),
AutoSpecMethodInfo(
client.XenClient.create_snapshot,
@ -352,10 +381,7 @@ CLIENT_METHODS_INFO: typing.Final[list[AutoSpecMethodInfo]] = [
returns=GENERAL_OPAQUE_REF,
),
# Returns vms as snapshots. As we are not going to really use them, we can return the same as VMS_INFO
AutoSpecMethodInfo(
client.XenClient.list_snapshots,
returns=VMS_INFO
),
AutoSpecMethodInfo(client.XenClient.list_snapshots, returns=VMS_INFO),
AutoSpecMethodInfo(
client.XenClient.list_folders,
returns=FOLDERS,

View File

@ -144,7 +144,7 @@ class TestXenProvider(UDSTransactionTestCase):
"""
Test the provider
"""
fixtures.reset_data()
fixtures.clean()
with fixtures.patched_provider() as provider:
api = provider._api # typing.cast(mock.MagicMock, provider._api)

View File

@ -47,7 +47,7 @@ from uds.services.Xen.xen import types as xen_types
class TestXenPublication(UDSTransactionTestCase):
def setUp(self) -> None:
super().setUp()
fixtures.reset_data()
fixtures.clean()
def test_publication(self) -> None:
with fixtures.patched_provider() as provider:

View File

@ -34,7 +34,7 @@ import random
import typing
from unittest import mock
from uds.core import types, ui
from uds.core import ui
from . import fixtures
from ...utils.test import UDSTransactionTestCase
@ -42,10 +42,10 @@ from ...utils.test import UDSTransactionTestCase
from uds.services.Xen.xen import types as xen_types
class TestProxmoxFixedService(UDSTransactionTestCase):
class TestXenFixedService(UDSTransactionTestCase):
def setUp(self) -> None:
super().setUp()
fixtures.reset_data()
fixtures.clean()
def test_service_is_available(self) -> None:
"""
@ -68,7 +68,7 @@ class TestProxmoxFixedService(UDSTransactionTestCase):
self.assertFalse(service.is_avaliable())
api.test.assert_called_with()
def test_service_methods_1(self) -> None:
def test_service_vm_methods(self) -> None:
with fixtures.patched_provider() as provider:
service = fixtures.create_service_fixed(provider=provider)
VM = random.choice(fixtures.VMS_INFO)
@ -126,6 +126,21 @@ class TestProxmoxFixedService(UDSTransactionTestCase):
userservice_instance.error.return_value,
)
def test_remove_and_free(self) -> None:
with fixtures.patched_provider() as provider:
service = fixtures.create_service_fixed(provider=provider)
with mock.patch.object(service, '_assigned_access') as assigned_access:
assigned_mock = mock.MagicMock()
assigned_access.return_value.__enter__.return_value = assigned_mock
service.remove_and_free('123')
assigned_mock.__contains__.assert_called_with('123')
assigned_mock.reset_mock()
assigned_mock.__contains__.return_value = True
service.remove_and_free('123')
assigned_mock.remove.assert_called_with('123')
assigned_mock.remove.assert_called_with('123')
def test_process_snapshot(self) -> None:
with fixtures.patched_provider() as provider:
api = typing.cast(mock.MagicMock, provider._api)
@ -138,14 +153,14 @@ class TestProxmoxFixedService(UDSTransactionTestCase):
# Create snapshot
api.list_snapshots.return_value = []
service.snapshot_creation(userservice_instance)
api.list_snapshots.assert_called_with(int(vmid), None)
api.create_snapshot.assert_called_with(int(vmid), None, 'UDS Snapshot', None)
api.list_snapshots.assert_called_with(vmid, full_info=False)
api.create_snapshot.assert_called_with(vmid, 'UDS Snapshot')
# Skip snapshot creation
api.reset_mock()
api.list_snapshots.return_value = fixtures.SNAPSHOTS_INFO
api.list_snapshots.return_value = fixtures.VMS_INFO
service.snapshot_recovery(userservice_instance)
api.list_snapshots.assert_called_with(int(vmid), None)
api.list_snapshots.assert_called_with(vmid, full_info=True)
api.create_snapshot.assert_not_called()
# Restore snapshot on exit
@ -153,14 +168,14 @@ class TestProxmoxFixedService(UDSTransactionTestCase):
api.reset_mock()
api.list_snapshots.return_value = []
service.snapshot_creation(userservice_instance)
api.list_snapshots.assert_called_with(int(vmid), None)
api.list_snapshots.assert_called_with(vmid, False)
# no snapshots, so no restore
api.restore_snapshot.assert_not_called()
# Reset and add snapshot
api.reset_mock()
api.list_snapshots.return_value = fixtures.SNAPSHOTS_INFO
api.list_snapshots.return_value = fixtures.VMS_INFO
service.snapshot_recovery(userservice_instance)
api.list_snapshots.assert_called_with(int(vmid), None)
# restore snapshot
api.restore_snapshot.assert_called_with(int(vmid), None, fixtures.SNAPSHOTS_INFO[0].name)
api.list_snapshots.assert_called_with(vmid, full_info=True)
# restore snapshot, as this is a test, in fact the SNAPSHOTS are the same as the VMS
api.restore_snapshot.assert_called_with(fixtures.VMS_INFO[0].opaque_ref)

View File

@ -0,0 +1,192 @@
# -*- coding: utf-8 -*-
#
# Copyright (c) 2024 Virtual Cable S.L.U.
# All rights reserved.
#
# Redistribution and use in source and binary forms, with or without modification,
# are permitted provided that the following conditions are met:
#
# * Redistributions of source code must retain the above copyright notice,
# this list of conditions and the following disclaimer.
# * Redistributions in binary form must reproduce the above copyright notice,
# this list of conditions and the following disclaimer in the documentation
# and/or other materials provided with the distribution.
# * Neither the name of Virtual Cable S.L.U. nor the names of its contributors
# may be used to endorse or promote products derived from this software
# without specific prior written permission.
#
# THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS IS"
# AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE
# IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE ARE
# DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT HOLDER OR CONTRIBUTORS BE LIABLE
# FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR CONSEQUENTIAL
# DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR
# SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER
# CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY,
# OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE
# OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
"""
Author: Adolfo Gómez, dkmaster at dkmon dot com
"""
import random
import typing
from unittest import mock
from uds.core.util import net
from . import fixtures
from ...utils.test import UDSTestCase
from uds.services.Xen.xen import types as xen_types
class TestXenLinkedService(UDSTestCase):
def setUp(self) -> None:
super().setUp()
fixtures.clean()
def test_service_data(self) -> None:
service = fixtures.create_service_linked()
self.assertEqual(service.datastore.value, fixtures.SERVICE_VALUES_DICT['datastore'])
self.assertEqual(service.min_space_gb.value, fixtures.SERVICE_VALUES_DICT['min_space_gb'])
self.assertEqual(service.machine.value, fixtures.SERVICE_VALUES_DICT['machine'])
self.assertEqual(service.network.value, fixtures.SERVICE_VALUES_DICT['network'])
self.assertEqual(service.memory.value, fixtures.SERVICE_VALUES_DICT['memory'])
self.assertEqual(service.shadow.value, fixtures.SERVICE_VALUES_DICT['shadow'])
self.assertEqual(service.remove_duplicates.value, fixtures.SERVICE_VALUES_DICT['remove_duplicates'])
self.assertEqual(service.maintain_on_error.value, fixtures.SERVICE_VALUES_DICT['maintain_on_error'])
self.assertEqual(service.basename.value, fixtures.SERVICE_VALUES_DICT['basename'])
self.assertEqual(service.lenname.value, fixtures.SERVICE_VALUES_DICT['lenname'])
def test_has_datastore_space(self) -> None:
with fixtures.patched_provider() as provider:
service = fixtures.create_service_linked(provider=provider)
api = typing.cast(mock.MagicMock, provider._api)
# Should not raise any exception
service.has_datastore_space()
api.get_sr_info.assert_called_with(service.datastore.value)
api.get_sr_info.return_value = fixtures.LOW_SPACE_SR_INFO
api.get_sr_info.side_effect = None # Reset side effect
# Should raise an exception
with self.assertRaises(Exception):
service.has_datastore_space()
def test_service_is_available(self) -> None:
"""
Test the provider
"""
with fixtures.patched_provider() as provider:
api = typing.cast(mock.MagicMock, provider._api)
service = fixtures.create_service_linked(provider=provider)
self.assertTrue(service.is_avaliable())
api.test.assert_called_with()
# With data cached, even if test fails, it will return True
api.test.side_effect = Exception('Testing exception')
self.assertTrue(service.is_avaliable())
# Data is cached, so we need to reset it
api.test.reset_mock()
service.provider().is_available.cache_clear() # type: ignore
# Now should return False as we have reset the cache
self.assertFalse(service.is_avaliable())
api.test.assert_called_with()
def test_start_deploy_of_template(self) -> None:
with fixtures.patched_provider() as provider:
api = typing.cast(mock.MagicMock, provider._api)
service = fixtures.create_service_linked(provider=provider)
service.start_deploy_of_template('name', 'comments')
# Ensure has space
api.get_sr_info.assert_called_with(service.datastore.value)
api.clone_vm.assert_called_with(service.machine.value, 'name', service.datastore.value)
def test_convert_to_template(self) -> None:
with fixtures.patched_provider() as provider:
api = typing.cast(mock.MagicMock, provider._api)
service = fixtures.create_service_linked(provider=provider)
service.convert_to_template('vm_opaque_ref')
api.convert_to_template.assert_called_with('vm_opaque_ref', service.shadow.value)
def test_start_deploy_from_template(self) -> None:
with fixtures.patched_provider() as provider:
api = typing.cast(mock.MagicMock, provider._api)
service = fixtures.create_service_linked(provider=provider)
service.start_deploy_from_template('template_opaque_ref', name='name', comments='comments')
api.start_deploy_from_template.assert_called_with('template_opaque_ref', 'name')
def test_delete_template(self) -> None:
with fixtures.patched_provider() as provider:
api = typing.cast(mock.MagicMock, provider._api)
service = fixtures.create_service_linked(provider=provider)
service.delete_template('template_opaque_ref')
api.delete_template.assert_called_once_with('template_opaque_ref')
def test_configure_machine(self) -> None:
with fixtures.patched_provider() as provider:
api = typing.cast(mock.MagicMock, provider._api)
service = fixtures.create_service_linked(provider=provider)
service.configure_machine('vm_opaque_ref', '00:01:02:03:04:05')
api.configure_vm.assert_called_once_with(
'vm_opaque_ref',
net_info={'network': service.network.value, 'mac': '00:01:02:03:04:05'},
memory=service.memory.value,
)
def test_get_mac(self) -> None:
with fixtures.patched_provider() as provider:
service = fixtures.create_service_linked(provider=provider)
for _ in range(10):
mac = service.get_mac(mock.MagicMock(), 'vm_opaque_ref')
self.assertTrue(net.is_valid_mac(mac))
def test_is_running(self) -> None:
with fixtures.patched_provider() as provider:
api = typing.cast(mock.MagicMock, provider._api)
service = fixtures.create_service_linked(provider=provider)
VM = random.choice(fixtures.VMS_INFO)
for state in xen_types.PowerState:
VM.power_state = state
api.reset_mock()
# Only RUNNING state is considered as running
self.assertEqual(
service.is_running(mock.MagicMock(), VM.opaque_ref), state == xen_types.PowerState.RUNNING
)
api.get_vm_info.assert_called_with(VM.opaque_ref)
def test_start_stop_shutdown(self) -> None:
with fixtures.patched_provider() as provider:
service = fixtures.create_service_linked(provider=provider)
VM = random.choice(fixtures.VMS_INFO)
VM.power_state = xen_types.PowerState.HALTED
service.start(mock.MagicMock(), VM.opaque_ref)
self.assertEqual(VM.power_state, xen_types.PowerState.RUNNING)
service.stop(mock.MagicMock(), VM.opaque_ref)
self.assertEqual(VM.power_state, xen_types.PowerState.HALTED)
VM.power_state = xen_types.PowerState.RUNNING
service.shutdown(mock.MagicMock(), VM.opaque_ref)
self.assertEqual(VM.power_state, xen_types.PowerState.HALTED)
def test_delete(self) -> None:
with fixtures.patched_provider() as provider:
api = typing.cast(mock.MagicMock, provider._api)
service = fixtures.create_service_linked(provider=provider)
service.delete(mock.MagicMock(), 'vm_opaque_ref')
api.delete_vm.assert_called_once_with('vm_opaque_ref')

View File

@ -0,0 +1,118 @@
# -*- coding: utf-8 -*-
#
# Copyright (c) 2024 Virtual Cable S.L.U.
# All rights reserved.
#
# Redistribution and use in source and binary forms, with or without modification,
# are permitted provided that the following conditions are met:
#
# * Redistributions of source code must retain the above copyright notice,
# this list of conditions and the following disclaimer.
# * Redistributions in binary form must reproduce the above copyright notice,
# this list of conditions and the following disclaimer in the documentation
# and/or other materials provided with the distribution.
# * Neither the name of Virtual Cable S.L.U. nor the names of its contributors
# may be used to endorse or promote products derived from this software
# without specific prior written permission.
#
# THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS IS"
# AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE
# IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE ARE
# DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT HOLDER OR CONTRIBUTORS BE LIABLE
# FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR CONSEQUENTIAL
# DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR
# SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER
# CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY,
# OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE
# OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
"""
Author: Adolfo Gómez, dkmaster at dkmon dot com
"""
from unittest import mock
from uds import models
from uds.core import types
from . import fixtures
from ...utils.test import UDSTransactionTestCase
from ...utils.generators import limited_iterator
from uds.services.Xen.xen import types as xen_types
# We use transactions on some related methods (storage access, etc...)
class TestXenFixedUserService(UDSTransactionTestCase):
def setUp(self) -> None:
fixtures.clean()
fixtures.set_all_vm_state(xen_types.PowerState.HALTED)
def test_userservice_fixed_user(self) -> None:
"""
Test the user service
"""
with fixtures.patched_provider() as provider:
service = fixtures.create_service_fixed(provider=provider)
userservice = fixtures.create_userservice_fixed(service=service)
with service._assigned_access() as assigned_machines:
self.assertEqual(assigned_machines, set())
# patch userservice db_obj() method to return a mock
userservice_db = mock.MagicMock()
userservice.db_obj = mock.MagicMock(return_value=userservice_db)
# Test Deploy for cache, should set to error due
# to the fact fixed services cannot have cached items
state = userservice.deploy_for_cache(level=types.services.CacheLevel.L1)
self.assertEqual(state, types.states.TaskState.ERROR)
# Test Deploy for user
state = userservice.deploy_for_user(models.User())
self.assertEqual(state, types.states.TaskState.RUNNING)
for _ in limited_iterator(lambda: state == types.states.TaskState.RUNNING, limit=128):
state = userservice.check_state()
self.assertEqual(state, types.states.TaskState.FINISHED)
# userservice_db Should have halle set_in_use(True)
userservice_db.set_in_use.assert_called_once_with(True)
# vmid should have been assigned, so it must be in the assigned machines
with service._assigned_access() as assigned_machines:
self.assertEqual({userservice._vmid}, assigned_machines)
# Now, let's release the service
state = userservice.destroy()
self.assertEqual(state, types.states.TaskState.RUNNING)
while state == types.states.TaskState.RUNNING:
state = userservice.check_state()
self.assertEqual(state, types.states.TaskState.FINISHED)
# must be empty now
with service._assigned_access() as assigned_machines:
self.assertEqual(assigned_machines, set())
# set_ready, machine is "started", set by mock fixture with the start invokation
state = userservice.set_ready()
self.assertEqual(state, types.states.TaskState.FINISHED)
# Set vm state to stopped
fixtures.set_all_vm_state(xen_types.PowerState.HALTED)
# Anc clear userservice cache
userservice.cache.clear()
state = userservice.set_ready()
self.assertEqual(state, types.states.TaskState.RUNNING) # Now running
for _ in limited_iterator(lambda: state == types.states.TaskState.RUNNING, limit=32):
state = userservice.check_state()
# Should be finished now
self.assertEqual(state, types.states.TaskState.FINISHED)

View File

@ -0,0 +1,299 @@
# -*- coding: utf-8 -*-
#
# Copyright (c) 2024 Virtual Cable S.L.U.
# All rights reserved.
#
# Redistribution and use in source and binary forms, with or without modification,
# are permitted provided that the following conditions are met:
#
# * Redistributions of source code must retain the above copyright notice,
# this list of conditions and the following disclaimer.
# * Redistributions in binary form must reproduce the above copyright notice,
# this list of conditions and the following disclaimer in the documentation
# and/or other materials provided with the distribution.
# * Neither the name of Virtual Cable S.L.U. nor the names of its contributors
# may be used to endorse or promote products derived from this software
# without specific prior written permission.
#
# THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS IS"
# AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE
# IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE ARE
# DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT HOLDER OR CONTRIBUTORS BE LIABLE
# FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR CONSEQUENTIAL
# DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR
# SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER
# CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY,
# OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE
# OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
"""
Author: Adolfo Gómez, dkmaster at dkmon dot com
"""
import typing
from unittest import mock
from uds import models
from uds.core import types
from . import fixtures
from ...utils.test import UDSTransactionTestCase
from ...utils.generators import limited_iterator
from uds.services.Xen.xen import types as xen_types
# We use transactions on some related methods (storage access, etc...)
class TestXenLinkedUserService(UDSTransactionTestCase):
def setUp(self) -> None:
fixtures.clean()
fixtures.set_all_vm_state(xen_types.PowerState.HALTED)
def test_userservice_linked_cache_l1(self) -> None:
"""
Test the user service
"""
with fixtures.patched_provider() as provider:
api = typing.cast(mock.MagicMock, provider._api)
service = fixtures.create_service_linked(provider=provider)
userservice = fixtures.create_userservice_linked(service=service)
publication = userservice.publication()
publication._vmid = '1'
state = userservice.deploy_for_cache(level=types.services.CacheLevel.L1)
self.assertEqual(state, types.states.TaskState.RUNNING)
# Ensure that in the event of failure, we don't loop forever
for _ in limited_iterator(lambda: state == types.states.TaskState.RUNNING, limit=128):
state = userservice.check_state()
self.assertEqual(state, types.states.TaskState.FINISHED, userservice._error_debug_info)
self.assertEqual(userservice._name[: len(service.get_basename())], service.get_basename())
self.assertEqual(len(userservice._name), len(service.get_basename()) + service.get_lenname())
vmid = int(userservice._vmid)
api.clone_machine.assert_called_with(
publication.machine(),
mock.ANY,
userservice._name,
mock.ANY,
True,
None,
service.datastore.value,
service.pool.value,
None,
)
# api.get_task should have been invoked at least once
self.assertTrue(api.get_task.called)
api.enable_machine_ha.assert_called()
api.set_machine_mac.assert_called_with(vmid, userservice._mac)
api.get_machine_pool_info.assert_called_with(vmid, service.pool.value, force=True)
api.start_machine.assert_called_with(vmid)
def test_userservice_linked_cache_l2_no_ha(self) -> None:
"""
Test the user service
"""
with fixtures.patched_provider() as provider:
api = typing.cast(mock.MagicMock, provider._api)
service = fixtures.create_service_linked(provider=provider)
userservice = fixtures.create_userservice_linked(service=service)
service.ha.value = '__' # Disabled
publication = userservice.publication()
publication._vmid = '1'
state = userservice.deploy_for_cache(level=types.services.CacheLevel.L2)
self.assertEqual(state, types.states.TaskState.RUNNING)
for _ in limited_iterator(lambda: state == types.states.TaskState.RUNNING, limit=128):
state = userservice.check_state()
# If first item in queue is WAIT, we must "simulate" the wake up from os manager
if userservice._queue[0] == types.services.Operation.WAIT:
userservice.process_ready_from_os_manager(None)
self.assertEqual(state, types.states.TaskState.FINISHED)
self.assertEqual(userservice._name[: len(service.get_basename())], service.get_basename())
self.assertEqual(len(userservice._name), len(service.get_basename()) + service.get_lenname())
vmid = int(userservice._vmid)
api.clone_machine.assert_called_with(
publication.machine(),
mock.ANY,
userservice._name,
mock.ANY,
True,
None,
service.datastore.value,
service.pool.value,
None,
)
# api.get_task should have been invoked at least once
self.assertTrue(api.get_task.called)
# Shoud not have been called since HA is disabled
api.enable_machine_ha.assert_not_called()
api.set_machine_mac.assert_called_with(vmid, userservice._mac)
api.get_machine_pool_info.assert_called_with(vmid, service.pool.value, force=True)
# Now, start should have been called
api.start_machine.assert_called_with(vmid)
# Stop machine should have been called
api.shutdown_machine.assert_called_with(vmid)
def test_userservice_linked_user(self) -> None:
"""
Test the user service
"""
with fixtures.patched_provider() as provider:
api = typing.cast(mock.MagicMock, provider._api)
service = fixtures.create_service_linked(provider=provider)
userservice = fixtures.create_userservice_linked(service=service)
publication = userservice.publication()
publication._vmid = '1'
state = userservice.deploy_for_user(models.User())
self.assertEqual(state, types.states.TaskState.RUNNING)
for _ in limited_iterator(lambda: state == types.states.TaskState.RUNNING, limit=128):
state = userservice.check_state()
self.assertEqual(
state,
types.states.TaskState.FINISHED,
f'Queue: {userservice._queue}, reason: {userservice._reason}, extra_info: {userservice._error_debug_info}',
)
self.assertEqual(userservice._name[: len(service.get_basename())], service.get_basename())
self.assertEqual(len(userservice._name), len(service.get_basename()) + service.get_lenname())
vmid = int(userservice._vmid)
api.clone_machine.assert_called_with(
publication.machine(),
mock.ANY,
userservice._name,
mock.ANY,
True,
None,
service.datastore.value,
service.pool.value,
None,
)
# api.get_task should have been invoked at least once
self.assertTrue(api.get_task.called)
api.enable_machine_ha.assert_called()
api.set_machine_mac.assert_called_with(vmid, userservice._mac)
api.get_machine_pool_info.assert_called_with(vmid, service.pool.value, force=True)
api.start_machine.assert_called_with(vmid)
# Ensure vm is stopped, because deployment should have started it (as api.start_machine was called)
fixtures.replace_vm_info(vmid, status='stopped')
# Set ready state with the valid machine
state = userservice.set_ready()
# Machine is stopped, so task must be RUNNING (opossed to FINISHED)
self.assertEqual(state, types.states.TaskState.RUNNING)
for _ in limited_iterator(lambda: state == types.states.TaskState.RUNNING, limit=32):
state = userservice.check_state()
# Should be finished now
self.assertEqual(state, types.states.TaskState.FINISHED)
def test_userservice_cancel(self) -> None:
"""
Test the user service
"""
with fixtures.patched_provider() as provider:
api = typing.cast(mock.MagicMock, provider._api)
for graceful in [True, False]:
service = fixtures.create_service_linked(provider=provider)
userservice = fixtures.create_userservice_linked(service=service)
service.try_soft_shutdown.value = graceful
publication = userservice.publication()
publication._vmid = '1'
# Set machine state for fixture to started
fixtures.VMS_INFO = [
fixtures.VMS_INFO[i]._replace(status='running') for i in range(len(fixtures.VMS_INFO))
]
state = userservice.deploy_for_user(models.User())
self.assertEqual(state, types.states.TaskState.RUNNING)
# Invoke cancel
api.reset_mock()
state = userservice.cancel()
self.assertEqual(state, types.states.TaskState.RUNNING)
# Ensure DESTROY_VALIDATOR is in the queue
self.assertIn(types.services.Operation.DESTROY_VALIDATOR, userservice._queue)
for _ in limited_iterator(lambda: state == types.states.TaskState.RUNNING, limit=128):
state = userservice.check_state()
# Now, should be finished without any problem, no call to api should have been done
self.assertEqual(state, types.states.TaskState.FINISHED)
self.assertEqual(len(api.mock_calls), 0)
# Now again, but process check_queue a couple of times before cancel
# we we have an _vmid
state = userservice.deploy_for_user(models.User())
self.assertEqual(state, types.states.TaskState.RUNNING)
for _ in limited_iterator(lambda: state == types.states.TaskState.RUNNING, limit=128):
state = userservice.check_state()
if userservice._vmid:
break
current_op = userservice._current_op()
state = userservice.cancel()
self.assertEqual(state, types.states.TaskState.RUNNING)
self.assertEqual(userservice._queue[0], current_op)
if graceful:
self.assertIn(types.services.Operation.SHUTDOWN, userservice._queue)
self.assertIn(types.services.Operation.SHUTDOWN_COMPLETED, userservice._queue)
self.assertIn(types.services.Operation.STOP, userservice._queue)
self.assertIn(types.services.Operation.STOP_COMPLETED, userservice._queue)
self.assertIn(types.services.Operation.DELETE, userservice._queue)
self.assertIn(types.services.Operation.DELETE_COMPLETED, userservice._queue)
for counter in limited_iterator(lambda: state == types.states.TaskState.RUNNING, limit=128):
state = userservice.check_state()
if counter > 5:
# Set machine state for fixture to stopped
fixtures.VMS_INFO = [
fixtures.VMS_INFO[i]._replace(status='stopped')
for i in range(len(fixtures.VMS_INFO))
]
self.assertEqual(state, types.states.TaskState.FINISHED)
if graceful:
api.shutdown_machine.assert_called()
else:
api.stop_machine.assert_called()
def test_userservice_basics(self) -> None:
with fixtures.patched_provider():
userservice = fixtures.create_userservice_linked()
userservice.set_ip('1.2.3.4')
self.assertEqual(userservice.get_ip(), '1.2.3.4')

View File

@ -156,7 +156,7 @@ class FixedService(services.Service, abc.ABC): # pylint: disable=too-many-publi
self.userservices_limit = len(self.machines.as_list())
@contextlib.contextmanager
def _assigned_access(self) -> collections.abc.Generator[set[str], None, None]:
def _assigned_access(self) -> typing.Iterator[set[str]]:
with self.storage.as_dict(atomic=True) as d:
machines: set[str] = d.get('vms', set())
initial_machines = machines.copy() # for comparison later

View File

@ -140,7 +140,7 @@ class XenLinkedUserService(DynamicUserService, autoserializable.AutoSerializable
) # oVirt don't let us to create machines with more than 15 chars!!!
comments = 'UDS Linked clone'
self._task = self.service().start_deploy_from_template(name, comments, template_id)
self._task = self.service().start_deploy_from_template(template_id, name=name, comments=comments)
if not self._task:
raise Exception('Can\'t create machine')

View File

@ -41,6 +41,8 @@ from uds.core.ui import gui
from .publication import XenPublication
from .deployment import XenLinkedUserService
from .xen import exceptions as xen_exceptions
# Not imported at runtime, just for type checking
if typing.TYPE_CHECKING:
from .provider import XenProvider
@ -207,12 +209,15 @@ class XenLinkedService(DynamicService): # pylint: disable=too-many-public-metho
logger.debug('Checking datastore space for %s: %s', self.datastore.value, info)
availableGB = (info.physical_size - info.physical_utilisation) // 1024
if availableGB < self.min_space_gb.as_int():
raise Exception(
raise xen_exceptions.XenFatalError(
'Not enough free space available: (Needs at least {} GB and there is only {} GB '.format(
self.min_space_gb.as_int(), availableGB
)
)
def is_avaliable(self) -> bool:
return self.provider().is_available()
def sanitized_name(self, name: str) -> str:
"""
Xen Seems to allow all kind of names
@ -251,26 +256,26 @@ class XenLinkedService(DynamicService): # pylint: disable=too-many-public-metho
with self.provider().get_connection() as api:
api.convert_to_template(vm_opaque_ref, self.shadow.value)
def start_deploy_from_template(self, name: str, comments: str, template_opaque_ref: str) -> str:
"""
Deploys a virtual machine on selected cluster from selected template
def start_deploy_from_template(self, template_opaque_ref: str, *, name: str, comments: str) -> str:
"""
Deploys a virtual machine on selected cluster from selected template
Args:
name (str): Name (sanitized) of the machine
comments (str): Comments for machine
template_opaque_ref (str): Opaque reference of the template to deploy from
Args:
name (str): Name (sanitized) of the machine
comments (str): Comments for machine
template_opaque_ref (str): Opaque reference of the template to deploy from
Returns:
str: Id of the task created for this operation
"""
logger.debug('Deploying from template %s machine %s', template_opaque_ref, name)
Returns:
str: Id of the task created for this operation
"""
logger.debug('Deploying from template %s machine %s', template_opaque_ref, name)
with self.provider().get_connection() as api:
self.has_datastore_space()
with self.provider().get_connection() as api:
self.has_datastore_space()
return api.start_deploy_from_template(template_opaque_ref, name)
return api.start_deploy_from_template(template_opaque_ref, name)
def remove_template(self, template_opaque_ref: str) -> None:
def delete_template(self, template_opaque_ref: str) -> None:
"""
invokes removeTemplate from parent provider
"""
@ -312,7 +317,7 @@ class XenLinkedService(DynamicService): # pylint: disable=too-many-public-metho
Can return a task, or None if no task is returned
"""
with self.provider().get_connection() as api:
api.start_vm(vmid, as_async=False)
api.start_vm(vmid)
def stop(self, caller_instance: 'DynamicUserService | DynamicPublication', vmid: str) -> None:
"""
@ -320,11 +325,11 @@ class XenLinkedService(DynamicService): # pylint: disable=too-many-public-metho
Can return a task, or None if no task is returned
"""
with self.provider().get_connection() as api:
api.stop_vm(vmid, as_async=False)
api.stop_vm(vmid)
def shutdown(self, caller_instance: 'DynamicUserService | DynamicPublication', vmid: str) -> None:
with self.provider().get_connection() as api:
api.shutdown_vm(vmid, as_async=False)
api.shutdown_vm(vmid)
def delete(self, caller_instance: 'DynamicUserService | DynamicPublication', vmid: str) -> None:
"""

View File

@ -137,9 +137,8 @@ class XenFixedService(FixedService): # pylint: disable=too-many-public-methods
with self.provider().get_connection() as api:
if api.get_vm_info(vmid).power_state.is_running():
return api.stop_vm(vmid)
return '' # Already stopped
def reset_vm(self, vmid: str) -> str:
"""
@ -153,14 +152,14 @@ class XenFixedService(FixedService): # pylint: disable=too-many-public-methods
with self.provider().get_connection() as api:
if api.get_vm_info(vmid).power_state.is_running():
return api.reset_vm(vmid)
return '' # Already stopped, no reset needed
def shutdown_vm(self, vmid: str) -> str:
with self.provider().get_connection() as api:
if api.get_vm_info(vmid).power_state.is_running():
if api.get_vm_info(vmid).power_state.is_running():
return api.shutdown_vm(vmid)
return '' # Already stopped
def is_avaliable(self) -> bool:
@ -197,8 +196,10 @@ class XenFixedService(FixedService): # pylint: disable=too-many-public-methods
with self.provider().get_connection() as api:
if self.use_snapshots.as_bool():
vmid = userservice_instance._vmid
snapshots = api.list_snapshots(vmid, full_info=False) # Only need ids, to check if there is any snapshot
snapshots = api.list_snapshots(
vmid, full_info=False
) # Only need ids, to check if there is any snapshot
logger.debug('Using snapshots')
# If no snapshot exists for this vm, try to create one for it on background
@ -212,7 +213,9 @@ class XenFixedService(FixedService): # pylint: disable=too-many-public-methods
name='UDS Snapshot',
)
except Exception as e:
self.do_log(types.log.LogLevel.WARNING, 'Could not create SNAPSHOT for this VM. ({})'.format(e))
self.do_log(
types.log.LogLevel.WARNING, 'Could not create SNAPSHOT for this VM. ({})'.format(e)
)
def snapshot_recovery(self, userservice_instance: FixedUserService) -> None:
userservice_instance = typing.cast(XenFixedUserService, userservice_instance)
@ -220,7 +223,7 @@ class XenFixedService(FixedService): # pylint: disable=too-many-public-methods
if self.use_snapshots.as_bool():
vmid = userservice_instance._vmid
snapshots = api.list_snapshots(vmid)
snapshots = api.list_snapshots(vmid, full_info=True) # We need full info to restore it
if snapshots:
try:
@ -274,11 +277,8 @@ class XenFixedService(FixedService): # pylint: disable=too-many-public-methods
with self.provider().get_connection() as conn:
return conn.get_vm_info(vmid).name
def remove_and_free(self, vmid: str) -> types.states.TaskState:
try:
with self._assigned_access() as assigned_vms:
assigned_vms.remove(vmid)
return types.states.TaskState.FINISHED
except Exception as e:
logger.warning('Cound not save assigned machines on fixed pool: %s', e)
raise
# default remove_and_free is ok
def is_ready(self, vmid: str) -> bool:
with self.provider().get_connection() as conn:
return conn.get_vm_info(vmid).power_state.is_running()

View File

@ -311,7 +311,10 @@ class XenClient: # pylint: disable=too-many-public-methods
return None
if vminfo.power_state == xen_types.PowerState.SUSPENDED:
return self.resume_vm(vm_opaque_ref, as_async)
if as_async:
return self.resume_vm(vm_opaque_ref)
else:
return self.resume_vm_sync(vm_opaque_ref)
return (self.Async if as_async else self).VM.start(vm_opaque_ref, False, False)
@ -346,7 +349,10 @@ class XenClient: # pylint: disable=too-many-public-methods
return None
if vminfo.power_state.is_suspended() is False:
return self.start_vm(vm_opaque_ref, as_async)
if as_async:
return self.start_vm(vm_opaque_ref)
else:
return self.start_vm_sync(vm_opaque_ref)
return (self.Async if as_async else self).VM.resume(vm_opaque_ref, False, False)
@ -356,38 +362,38 @@ class XenClient: # pylint: disable=too-many-public-methods
return None
return (self.Async if as_async else self).VM.clean_shutdown(vm_opaque_ref)
def start_vm(self, vm_opaque_ref: str, as_async: bool = True) -> str:
return typing.cast(str, self._start_vm(vm_opaque_ref, as_async)) # We know it's not None on async
def start_vm(self, vm_opaque_ref: str) -> str:
return typing.cast(str, self._start_vm(vm_opaque_ref, True)) # We know it's not None on async
def start_vm_sync(self, vm_opaque_ref: str) -> None:
self._start_vm(vm_opaque_ref, False)
def stop_vm(self, vm_opaque_ref: str, as_async: bool = True) -> str:
return typing.cast(str, self._stop_vm(vm_opaque_ref, as_async))
def stop_vm(self, vm_opaque_ref: str) -> str:
return typing.cast(str, self._stop_vm(vm_opaque_ref, True))
def stop_vm_sync(self, vm_opaque_ref: str) -> None:
self._stop_vm(vm_opaque_ref, False)
def reset_vm(self, vm_opaque_ref: str, as_async: bool = True) -> str:
return typing.cast(str, self._reset_vm(vm_opaque_ref, as_async))
def reset_vm(self, vm_opaque_ref: str) -> str:
return typing.cast(str, self._reset_vm(vm_opaque_ref, True))
def reset_vm_sync(self, vm_opaque_ref: str) -> None:
self._reset_vm(vm_opaque_ref, False)
def suspend_vm(self, vm_opaque_ref: str, as_async: bool = True) -> str:
return typing.cast(str, self._suspend_vm(vm_opaque_ref, as_async))
def suspend_vm(self, vm_opaque_ref: str) -> str:
return typing.cast(str, self._suspend_vm(vm_opaque_ref, True))
def suspend_vm_sync(self, vm_opaque_ref: str) -> None:
self._suspend_vm(vm_opaque_ref, False)
def resume_vm(self, vm_opaque_ref: str, as_async: bool = True) -> str:
return typing.cast(str, self._resume_vm(vm_opaque_ref, as_async))
def resume_vm(self, vm_opaque_ref: str) -> str:
return typing.cast(str, self._resume_vm(vm_opaque_ref, True))
def resume_vm_sync(self, vm_opaque_ref: str) -> None:
self._resume_vm(vm_opaque_ref, False)
def shutdown_vm(self, vm_opaque_ref: str, as_async: bool = True) -> str:
return typing.cast(str, self._shutdown_vm(vm_opaque_ref, as_async))
def shutdown_vm(self, vm_opaque_ref: str) -> str:
return typing.cast(str, self._shutdown_vm(vm_opaque_ref, True))
def shutdown_vm_sync(self, vm_opaque_ref: str) -> None:
self._shutdown_vm(vm_opaque_ref, False)