From 6e2980cffc0f76c4eab46b1c2f75d26efb6d9280 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Adolfo=20G=C3=B3mez=20Garc=C3=ADa?= Date: Mon, 18 Mar 2024 00:21:49 +0100 Subject: [PATCH] Added try soft shutdown to ovirt --- .../uds/services/OVirt/deployment_linked.py | 79 +++++++++++-- .../src/uds/services/OVirt/service_linked.py | 5 + .../src/uds/services/OpenGnsys/deployment.py | 2 +- .../uds/services/Proxmox/deployment_linked.py | 8 +- .../src/uds/services/Sample/deployment_two.py | 2 +- server/src/uds/services/Test/deployment.py | 2 +- .../services/openstack/test_userservice.py | 2 +- .../services/ovirt/test_userservice_linked.py | 108 +++++++++++++++++- .../proxmox/test_userservice_fixed.py | 2 +- .../proxmox/test_userservice_linked.py | 8 +- 10 files changed, 194 insertions(+), 24 deletions(-) diff --git a/server/src/uds/services/OVirt/deployment_linked.py b/server/src/uds/services/OVirt/deployment_linked.py index f68a302f5..3ab69b2fc 100644 --- a/server/src/uds/services/OVirt/deployment_linked.py +++ b/server/src/uds/services/OVirt/deployment_linked.py @@ -39,6 +39,7 @@ import typing from uds.core import consts, services, types from uds.core.managers.userservice import UserServiceManager from uds.core.util import autoserializable, log +from uds.core.util.model import sql_stamp_seconds from .jobs import OVirtDeferredRemoval from .ovirt import types as ov_types @@ -72,6 +73,7 @@ class Operation(enum.IntEnum): FINISH = 7 RETRY = 8 CHANGEMAC = 9 + GRACEFUL_STOP = 10 opUnknown = 99 @@ -297,7 +299,7 @@ if sys.platform == 'win32': self._init_queue_for_deploy(False) return self._execute_queue() - def deploy_for_cache(self, level: int) -> types.states.TaskState: + def deploy_for_cache(self, level: types.services.CacheLevel) -> types.states.TaskState: """ Deploys an service instance for cache """ @@ -390,7 +392,7 @@ if sys.platform == 'win32': self._set_checks_counter(0) try: - operation_runner = EXECUTORS[op] + operation_runner = _EXECUTORS[op] operation_runner(self) @@ -476,6 +478,28 @@ if sys.platform == 'win32': self.service().provider().api.stop_machine(self._vmid) + def _gracely_stop(self) -> None: + """ + Tries to stop machine using qemu guest tools + If it takes too long to stop, or qemu guest tools are not installed, + will use "power off" "a las bravas" + """ + self._task = '' + shutdown = -1 # Means machine already stopped + try: + vm_info = self.service().provider().api.get_machine_info(self._vmid) + if vm_info.status == ov_types.VMStatus.UNKNOWN: + raise Exception('Not found') + except Exception as e: + raise Exception('Machine not found on stop machine') from e + + if vm_info.status in UP_STATES: + self.service().provider().api.shutdown_machine(self._vmid) + shutdown = sql_stamp_seconds() + + logger.debug('Stoped vm using guest tools') + self.storage.save_pickled('shutdown', shutdown) + def _suspend_machine(self) -> None: """ Suspends the machine @@ -558,6 +582,40 @@ if sys.platform == 'win32': """ return types.states.TaskState.FINISHED + def _graceful_stop_checker(self) -> types.states.TaskState: + """ + Check if the machine has gracely stopped (timed shutdown) + """ + with self.storage.as_dict() as data: + shutdown_start = data.get('shutdown', -1) + logger.debug('Shutdown start: %s', shutdown_start) + if shutdown_start < 0: # Was already stopped + # Machine is already stop + logger.debug('Machine WAS stopped') + return types.states.TaskState.FINISHED + + if shutdown_start == 0: # Was shut down a las bravas + logger.debug('Macine DO NOT HAVE guest tools') + return self._stop_checker() + + logger.debug('Checking State') + # Check if machine is already stopped + if self.service().provider().api.get_machine_info(self._vmid).status in DOWN_STATES: + return types.states.TaskState.FINISHED # It's stopped + + logger.debug('State is running') + if sql_stamp_seconds() - shutdown_start > consts.os.MAX_GUEST_SHUTDOWN_WAIT: + logger.debug('Time is consumed, falling back to stop') + self.do_log( + log.LogLevel.ERROR, + f'Could not shutdown machine using soft power off in time ({consts.os.MAX_GUEST_SHUTDOWN_WAIT} seconds). Powering off.', + ) + # Not stopped by guest in time, but must be stopped normally + self.storage.save_pickled('shutdown', 0) + self._stop_machine() # Launch "hard" stop + + return types.states.TaskState.RUNNING + def check_state(self) -> types.states.TaskState: """ Check what operation is going on, and acts acordly to it @@ -572,7 +630,7 @@ if sys.platform == 'win32': return types.states.TaskState.FINISHED try: - operation_checker = CHECKERS[op] + operation_checker = _CHECKERS[op] state = operation_checker(self) if state == types.states.TaskState.FINISHED: @@ -624,11 +682,16 @@ if sys.platform == 'win32': if op == Operation.ERROR: return self._error('Machine is already in error state!') + # Take into account the try to stop gracefully + graceful_stop: list[Operation] = ( + [] if not self.service().try_graceful_shutdown() else [Operation.GRACEFUL_STOP] + ) + if op in (Operation.FINISH, Operation.WAIT): - self._queue = [Operation.STOP, Operation.REMOVE, Operation.FINISH] + self._queue = graceful_stop + [Operation.STOP, Operation.REMOVE, Operation.FINISH] return self._execute_queue() - self._queue = [op, Operation.STOP, Operation.REMOVE, Operation.FINISH] + self._queue = [op] + graceful_stop + [Operation.STOP, Operation.REMOVE, Operation.FINISH] # Do not execute anything.here, just continue normally return types.states.TaskState.RUNNING @@ -671,11 +734,12 @@ if sys.platform == 'win32': ) -EXECUTORS: dict[Operation, collections.abc.Callable[[OVirtLinkedUserService], None]] = { +_EXECUTORS: dict[Operation, collections.abc.Callable[[OVirtLinkedUserService], None]] = { Operation.CREATE: OVirtLinkedUserService._create, Operation.RETRY: OVirtLinkedUserService._retry, Operation.START: OVirtLinkedUserService._start_machine, Operation.STOP: OVirtLinkedUserService._stop_machine, + Operation.GRACEFUL_STOP: OVirtLinkedUserService._gracely_stop, Operation.SUSPEND: OVirtLinkedUserService._suspend_machine, Operation.WAIT: OVirtLinkedUserService._wait, Operation.REMOVE: OVirtLinkedUserService._remove, @@ -683,12 +747,13 @@ EXECUTORS: dict[Operation, collections.abc.Callable[[OVirtLinkedUserService], No } -CHECKERS: dict[Operation, collections.abc.Callable[[OVirtLinkedUserService], types.states.TaskState]] = { +_CHECKERS: dict[Operation, collections.abc.Callable[[OVirtLinkedUserService], types.states.TaskState]] = { Operation.CREATE: OVirtLinkedUserService._create_checker, Operation.RETRY: OVirtLinkedUserService._retry_checker, Operation.WAIT: OVirtLinkedUserService._wait_checker, Operation.START: OVirtLinkedUserService._start_checker, Operation.STOP: OVirtLinkedUserService._stop_checker, + Operation.GRACEFUL_STOP: OVirtLinkedUserService._graceful_stop_checker, Operation.SUSPEND: OVirtLinkedUserService._suspend_checker, Operation.REMOVE: OVirtLinkedUserService._remove_checker, Operation.CHANGEMAC: OVirtLinkedUserService._mac_checker, diff --git a/server/src/uds/services/OVirt/service_linked.py b/server/src/uds/services/OVirt/service_linked.py index eb339fbbe..d90e23cef 100644 --- a/server/src/uds/services/OVirt/service_linked.py +++ b/server/src/uds/services/OVirt/service_linked.py @@ -137,6 +137,8 @@ class OVirtLinkedService(services.Service): # pylint: disable=too-many-public-m old_field_name='minSpaceGB', ) + try_soft_shutdown = fields.soft_shutdown_field() + machine = gui.ChoiceField( label=_("Base Machine"), order=110, @@ -344,3 +346,6 @@ class OVirtLinkedService(services.Service): # pylint: disable=too-many-public-m def is_avaliable(self) -> bool: return self.provider().is_available() + + def try_graceful_shutdown(self) -> bool: + return self.try_soft_shutdown.as_bool() diff --git a/server/src/uds/services/OpenGnsys/deployment.py b/server/src/uds/services/OpenGnsys/deployment.py index 397d3cbfa..94335d388 100644 --- a/server/src/uds/services/OpenGnsys/deployment.py +++ b/server/src/uds/services/OpenGnsys/deployment.py @@ -184,7 +184,7 @@ class OpenGnsysUserService(services.UserService, autoserializable.AutoSerializab self._init_queue_for_deploy() return self._execute_queue() - def deploy_for_cache(self, level: int) -> types.states.TaskState: + def deploy_for_cache(self, level: types.services.CacheLevel) -> types.states.TaskState: """ Deploys an service instance for cache """ diff --git a/server/src/uds/services/Proxmox/deployment_linked.py b/server/src/uds/services/Proxmox/deployment_linked.py index 4f9a662a6..1febae529 100644 --- a/server/src/uds/services/Proxmox/deployment_linked.py +++ b/server/src/uds/services/Proxmox/deployment_linked.py @@ -260,8 +260,8 @@ if sys.platform == 'win32': self._init_queue_for_deploy(level == types.services.CacheLevel.L2) return self._execute_queue() - def _init_queue_for_deploy(self, forLevel2: bool = False) -> None: - if forLevel2 is False: + def _init_queue_for_deploy(self, cache_l2: bool = False) -> None: + if cache_l2 is False: self._queue = [Operation.CREATE, Operation.GET_MAC, Operation.START, Operation.FINISH] else: self._queue = [ @@ -365,9 +365,9 @@ if sys.platform == 'win32': def _wait_checker(self) -> types.states.TaskState: """ - This method is not used, because wait operation is never used + Wait checker waits forever, until something external wakes it up """ - return types.states.TaskState.FINISHED + return types.states.TaskState.RUNNING def _create(self) -> None: """ diff --git a/server/src/uds/services/Sample/deployment_two.py b/server/src/uds/services/Sample/deployment_two.py index 74b18de6c..b1438a99f 100644 --- a/server/src/uds/services/Sample/deployment_two.py +++ b/server/src/uds/services/Sample/deployment_two.py @@ -297,7 +297,7 @@ class SampleUserServiceTwo(services.UserService): return types.states.TaskState.RUNNING - def deploy_for_cache(self, level: int) -> types.states.TaskState: + def deploy_for_cache(self, level: types.services.CacheLevel) -> types.states.TaskState: """ Deploys a user deployment as cache. diff --git a/server/src/uds/services/Test/deployment.py b/server/src/uds/services/Test/deployment.py index 1ffd8a8a1..fc140531d 100644 --- a/server/src/uds/services/Test/deployment.py +++ b/server/src/uds/services/Test/deployment.py @@ -108,7 +108,7 @@ class TestUserService(services.UserService): self.data.count = 3 return types.states.TaskState.RUNNING - def deploy_for_cache(self, level: int) -> types.states.TaskState: + def deploy_for_cache(self, level: types.services.CacheLevel) -> types.states.TaskState: logger.info('Deploying for cache %s %s', level, self.data) self.data.count = 3 return types.states.TaskState.RUNNING diff --git a/server/tests/services/openstack/test_userservice.py b/server/tests/services/openstack/test_userservice.py index be8c34c97..6d607cc44 100644 --- a/server/tests/services/openstack/test_userservice.py +++ b/server/tests/services/openstack/test_userservice.py @@ -62,7 +62,7 @@ class TestOpenstackLiveDeployment(UDSTransactionTestCase): publication._template_id = 'snap1' if to_test == 'cache': - state = userservice.deploy_for_cache(level=1) + state = userservice.deploy_for_cache(level=types.services.CacheLevel.L1) else: state = userservice.deploy_for_user(models.User()) diff --git a/server/tests/services/ovirt/test_userservice_linked.py b/server/tests/services/ovirt/test_userservice_linked.py index 0064b6259..d73575abd 100644 --- a/server/tests/services/ovirt/test_userservice_linked.py +++ b/server/tests/services/ovirt/test_userservice_linked.py @@ -30,6 +30,7 @@ """ Author: Adolfo Gómez, dkmaster at dkmon dot com """ +from uds import models from uds.core import types from uds.services.OVirt.deployment_linked import Operation @@ -50,10 +51,11 @@ class TestProxmovLinkedService(UDSTransactionTestCase): vm.status = ov_types.VMStatus.DOWN def test_max_check_works(self) -> None: + # Tests that the userservice does not gets stuck in a loop if cannot complete some operation with fixtures.patch_provider_api() as _api: userservice = fixtures.create_linked_userservice() - state = userservice.deploy_for_cache(level=1) + state = userservice.deploy_for_cache(level=types.services.CacheLevel.L1) for _ in limited_iterator(lambda: state == types.states.TaskState.RUNNING, limit=128): state = userservice.check_state() @@ -65,7 +67,7 @@ class TestProxmovLinkedService(UDSTransactionTestCase): def test_userservice_linked_cache_l1(self) -> None: """ - Test the user service + Test the user service for cache l1 """ with fixtures.patch_provider_api() as api: userservice = fixtures.create_linked_userservice() @@ -75,7 +77,7 @@ class TestProxmovLinkedService(UDSTransactionTestCase): _publication = userservice.publication() - state = userservice.deploy_for_cache(level=1) + state = userservice.deploy_for_cache(level=types.services.CacheLevel.L1) self.assertEqual(state, types.states.TaskState.RUNNING) @@ -87,6 +89,7 @@ class TestProxmovLinkedService(UDSTransactionTestCase): vm = utils.find_attr_in_list(fixtures.VMS_INFO, 'id', userservice._vmid) vm.status = ov_types.VMStatus.UP state = userservice.check_state() + self.assertEqual(state, types.states.TaskState.FINISHED) @@ -107,7 +110,7 @@ class TestProxmovLinkedService(UDSTransactionTestCase): def test_userservice_linked_cache_l2(self) -> None: """ - Test the user service + Test the user service for cache level 2 """ with fixtures.patch_provider_api() as api: userservice = fixtures.create_linked_userservice() @@ -117,7 +120,7 @@ class TestProxmovLinkedService(UDSTransactionTestCase): _publication = userservice.publication() - state = userservice.deploy_for_cache(level=2) + state = userservice.deploy_for_cache(level=types.services.CacheLevel.L2) self.assertEqual(state, types.states.TaskState.RUNNING) @@ -134,7 +137,7 @@ class TestProxmovLinkedService(UDSTransactionTestCase): vm.status = ov_types.VMStatus.SUSPENDED state = userservice.check_state() - # If first item in queue is WAIT, we must "simulate" the wake up + # If first item in queue is WAIT, we must "simulate" the wake up from os manager if userservice._queue[0] == Operation.WAIT: state = userservice.process_ready_from_os_manager(None) @@ -154,3 +157,96 @@ class TestProxmovLinkedService(UDSTransactionTestCase): api.update_machine_mac.assert_called() api.fix_usb.assert_called() api.start_machine.assert_called() + + def test_userservice_linked_user(self) -> None: + """ + Test the user service for user deployment + """ + with fixtures.patch_provider_api() as api: + userservice = fixtures.create_linked_userservice() + + service = userservice.service() + service.usb.value = 'native' # With usb + + _publication = userservice.publication() + + state = userservice.deploy_for_user(models.User()) + + self.assertEqual(state, types.states.TaskState.RUNNING) + + # Ensure that in the event of failure, we don't loop forever + for counter in limited_iterator(lambda: state == types.states.TaskState.RUNNING, limit=128): + # this user service expects the machine to be started at some point, so after a few iterations, we set it to started + # note that the user service has a counter for max "recheck" without any success, and if reached, it will fail + if counter == 12: + vm = utils.find_attr_in_list(fixtures.VMS_INFO, 'id', userservice._vmid) + vm.status = ov_types.VMStatus.UP + state = userservice.check_state() + + self.assertEqual(state, types.states.TaskState.FINISHED) + + self.assertEqual(userservice._name[: len(service.get_basename())], service.get_basename()) + self.assertEqual(len(userservice._name), len(service.get_basename()) + service.get_lenname()) + + # Assarts has an vmid + self.assertTrue(bool(userservice._vmid)) + + # Assert several deploy api methods has been called, no matter args + # tested on other tests + api.get_storage_info.assert_called() + api.deploy_from_template.assert_called() + api.get_machine_info.assert_called() + api.update_machine_mac.assert_called() + api.fix_usb.assert_called() + api.start_machine.assert_called() + + def test_userservice_cancel(self) -> None: + """ + Test the user service + """ + with fixtures.patch_provider_api() as _api: + for graceful in [True, False]: + userservice = fixtures.create_linked_userservice() + service = userservice.service() + service.try_soft_shutdown.value = graceful + publication = userservice.publication() + publication._vmid = '1' + + # Set machine state for fixture to started + fixtures.VMS_INFO = [ + fixtures.VMS_INFO[i]._replace(status='running') for i in range(len(fixtures.VMS_INFO)) + ] + + state = userservice.deploy_for_user(models.User()) + + self.assertEqual(state, types.states.TaskState.RUNNING) + + current_op = userservice._get_current_op() + + # Invoke cancel + state = userservice.cancel() + + self.assertEqual(state, types.states.TaskState.RUNNING) + + self.assertEqual( + userservice._queue, + [current_op] + + ([Operation.GRACEFUL_STOP] if graceful else []) + + [Operation.STOP, Operation.REMOVE, Operation.FINISH], + ) + + for counter in limited_iterator(lambda: state == types.states.TaskState.RUNNING, limit=128): + state = userservice.check_state() + if counter > 5: + # Set machine state for fixture to stopped + fixtures.VMS_INFO = [ + fixtures.VMS_INFO[i]._replace(status='stopped') + for i in range(len(fixtures.VMS_INFO)) + ] + + self.assertEqual(state, types.states.TaskState.FINISHED) + + if graceful: + _api.shutdown_machine.assert_called() + else: + _api.stop_machine.assert_called() diff --git a/server/tests/services/proxmox/test_userservice_fixed.py b/server/tests/services/proxmox/test_userservice_fixed.py index 56a489096..b737a88d1 100644 --- a/server/tests/services/proxmox/test_userservice_fixed.py +++ b/server/tests/services/proxmox/test_userservice_fixed.py @@ -60,7 +60,7 @@ class TestProxmovLinkedService(UDSTransactionTestCase): # Test Deploy for cache, should raise Exception due # to the fact fixed services cannot have cached items with self.assertRaises(Exception): - userservice.deploy_for_cache(level=1) + userservice.deploy_for_cache(level=types.services.CacheLevel.L1) state = userservice.deploy_for_user(models.User()) diff --git a/server/tests/services/proxmox/test_userservice_linked.py b/server/tests/services/proxmox/test_userservice_linked.py index 0d51e76a1..423cb1479 100644 --- a/server/tests/services/proxmox/test_userservice_linked.py +++ b/server/tests/services/proxmox/test_userservice_linked.py @@ -61,7 +61,7 @@ class TestProxmovLinkedService(UDSTransactionTestCase): publication = userservice.publication() publication._vmid = '1' - state = userservice.deploy_for_cache(level=1) + state = userservice.deploy_for_cache(level=types.services.CacheLevel.L1) self.assertEqual(state, types.states.TaskState.RUNNING) @@ -114,12 +114,16 @@ class TestProxmovLinkedService(UDSTransactionTestCase): publication = userservice.publication() publication._vmid = '1' - state = userservice.deploy_for_cache(level=2) + state = userservice.deploy_for_cache(level=types.services.CacheLevel.L2) self.assertEqual(state, types.states.TaskState.RUNNING) for _ in limited_iterator(lambda: state == types.states.TaskState.RUNNING, limit=128): state = userservice.check_state() + + # If first item in queue is WAIT, we must "simulate" the wake up from os manager + if userservice._queue[0] == Operation.WAIT: + state = userservice.process_ready_from_os_manager(None) self.assertEqual(state, types.states.TaskState.FINISHED)