mirror of
https://github.com/dkmstr/openuds.git
synced 2025-03-12 04:58:34 +03:00
Fixing dynamic service and proxmox implementation using this
This commit is contained in:
parent
83f4359f90
commit
165d3bde21
@ -104,10 +104,12 @@ class DynamicPublication(services.Publication, autoserializable.AutoSerializable
|
||||
|
||||
def publish(self) -> types.states.TaskState:
|
||||
""" """
|
||||
self._queue = self._publish_queue
|
||||
self._queue = self._publish_queue.copy()
|
||||
self._debug('publish')
|
||||
return self._execute_queue()
|
||||
|
||||
def _execute_queue(self) -> types.states.TaskState:
|
||||
self._debug('execute_queue')
|
||||
op = self._current_op()
|
||||
|
||||
if op == Operation.ERROR:
|
||||
@ -123,10 +125,9 @@ class DynamicPublication(services.Publication, autoserializable.AutoSerializable
|
||||
if op.is_custom():
|
||||
self.op_custom(op)
|
||||
else:
|
||||
operation_runner = _EXECUTORS[op]
|
||||
|
||||
# Invoke using instance, we have overrided methods
|
||||
# and we want to use the overrided ones
|
||||
operation_runner = _EXECUTORS[op]
|
||||
getattr(self, operation_runner.__name__)()
|
||||
|
||||
return types.states.TaskState.RUNNING
|
||||
@ -161,8 +162,10 @@ class DynamicPublication(services.Publication, autoserializable.AutoSerializable
|
||||
if op.is_custom():
|
||||
state = self.op_custom_checker(op)
|
||||
else:
|
||||
state = _CHECKERS[op](self)
|
||||
|
||||
# Invoke using instance, we have overrided methods
|
||||
# and we want to use the overrided ones
|
||||
operation_checker = _CHECKERS[op]
|
||||
state = getattr(self, operation_checker.__name__)()
|
||||
if state == types.states.TaskState.FINISHED:
|
||||
# Remove runing op
|
||||
self._queue.pop(0)
|
||||
@ -172,7 +175,6 @@ class DynamicPublication(services.Publication, autoserializable.AutoSerializable
|
||||
except Exception as e:
|
||||
return self._error(e)
|
||||
|
||||
|
||||
@typing.final
|
||||
def destroy(self) -> types.states.TaskState:
|
||||
"""
|
||||
@ -180,13 +182,17 @@ class DynamicPublication(services.Publication, autoserializable.AutoSerializable
|
||||
"""
|
||||
self._is_flagged_for_destroy = False # Reset flag
|
||||
op = self._current_op()
|
||||
|
||||
# If already removing, do nothing
|
||||
if op == Operation.REMOVE:
|
||||
return types.states.TaskState.RUNNING
|
||||
|
||||
if op == Operation.ERROR:
|
||||
return self._error('Machine is already in error state!')
|
||||
|
||||
# If a "paused" state, reset queue to destroy
|
||||
if op == Operation.FINISH:
|
||||
self._queue = self._destroy_queue
|
||||
self._queue = self._destroy_queue.copy()
|
||||
return self._execute_queue()
|
||||
|
||||
# If must wait until finish, flag for destroy and wait
|
||||
@ -194,10 +200,18 @@ class DynamicPublication(services.Publication, autoserializable.AutoSerializable
|
||||
self._is_flagged_for_destroy = True
|
||||
else:
|
||||
# If other operation, wait for finish before destroying
|
||||
self._queue = [op] + self._destroy_queue
|
||||
self._queue = [op] + self._destroy_queue # Copy not needed, will be copied anyway due to list concatenation
|
||||
# Do not execute anything.here, just continue normally
|
||||
return types.states.TaskState.RUNNING
|
||||
|
||||
def cancel(self) -> types.states.TaskState:
|
||||
"""
|
||||
Cancels the publication (or cancels it if it's in the middle of a creation process)
|
||||
This can be overriden, just in case we need some special handling
|
||||
"""
|
||||
return self.destroy()
|
||||
|
||||
@typing.final
|
||||
def error_reason(self) -> str:
|
||||
return self._reason
|
||||
|
||||
@ -206,6 +220,7 @@ class DynamicPublication(services.Publication, autoserializable.AutoSerializable
|
||||
def op_initialize(self) -> None:
|
||||
"""
|
||||
This method is called when the service is initialized
|
||||
Default initialization method sets the name and flags the service as not destroyed
|
||||
"""
|
||||
if self.check_space() is False:
|
||||
raise Exception('Not enough space to publish')
|
||||
@ -438,7 +453,7 @@ class DynamicPublication(services.Publication, autoserializable.AutoSerializable
|
||||
|
||||
def _debug(self, txt: str) -> None:
|
||||
logger.debug(
|
||||
'Queue at %s for %s: %s, mac:%s, vmId:%s',
|
||||
'Queue at %s for %s: %s, vmid:%s',
|
||||
txt,
|
||||
self._name,
|
||||
[DynamicPublication._op2str(op) for op in self._queue],
|
||||
@ -450,9 +465,8 @@ class DynamicPublication(services.Publication, autoserializable.AutoSerializable
|
||||
|
||||
|
||||
# This is a map of operations to methods
|
||||
# Operations, duwe to the fact that can be overrided some of them, must be invoked via instance
|
||||
# Basically, all methods starting with _ are final, and all other are overridable
|
||||
# We use __name__ later to use them, so we can use type checking and invoke them via instance
|
||||
# Operation methods, due to the fact that can be overrided, must be invoked via instance
|
||||
# We use getattr(FNC.__name__, ...) to use them, so we can use type checking and invoke them via instance
|
||||
# Note that ERROR and FINISH are not here, as they final states not needing to be executed
|
||||
_EXECUTORS: typing.Final[
|
||||
collections.abc.Mapping[Operation, collections.abc.Callable[[DynamicPublication], None]]
|
||||
|
@ -359,11 +359,11 @@ class DynamicUserService(services.UserService, autoserializable.AutoSerializable
|
||||
return self._error('Machine is already in error state!')
|
||||
|
||||
shutdown_operations: list[Operation] = [] if not self.service().try_graceful_shutdown() else [Operation.SHUTDOWN, Operation.SHUTDOWN_COMPLETED]
|
||||
destroy_operations = shutdown_operations + self._destroy_queue
|
||||
destroy_operations = shutdown_operations + self._destroy_queue # copy is not needed due to list concatenation
|
||||
|
||||
# If a "paused" state, reset queue to destroy
|
||||
if op in (Operation.FINISH, Operation.WAIT):
|
||||
self._queue[:] = destroy_operations
|
||||
self._set_queue(destroy_operations)
|
||||
return self._execute_queue()
|
||||
|
||||
# If must wait until finish, flag for destroy and wait
|
||||
@ -371,7 +371,7 @@ class DynamicUserService(services.UserService, autoserializable.AutoSerializable
|
||||
self._is_flagged_for_destroy = True
|
||||
else:
|
||||
# If other operation, wait for finish before destroying
|
||||
self._queue = [op] + destroy_operations
|
||||
self._set_queue([op] + destroy_operations)
|
||||
# Do not execute anything.here, just continue normally
|
||||
return types.states.TaskState.RUNNING
|
||||
|
||||
@ -661,9 +661,8 @@ class DynamicUserService(services.UserService, autoserializable.AutoSerializable
|
||||
|
||||
|
||||
# This is a map of operations to methods
|
||||
# Operations, duwe to the fact that can be overrided some of them, must be invoked via instance
|
||||
# Basically, all methods starting with _ are final, and all other are overridable
|
||||
# We use __name__ later to use them, so we can use type checking and invoke them via instance
|
||||
# Operation methods, due to the fact that can be overrided, must be invoked via instance
|
||||
# We use getattr(FNC.__name__, ...) to use them, so we can use type checking and invoke them via instance
|
||||
# Note that ERROR and FINISH are not here, as they final states not needing to be executed
|
||||
_EXECUTORS: typing.Final[
|
||||
collections.abc.Mapping[Operation, collections.abc.Callable[[DynamicUserService], None]]
|
||||
|
@ -170,6 +170,13 @@ class FixedUserService(services.UserService, autoserializable.AutoSerializable,
|
||||
# copy is needed to avoid modifying class var, and access using instance allowing to get, if provided, overriden queue
|
||||
self._queue = self._create_queue.copy()
|
||||
return self._execute_queue()
|
||||
|
||||
@typing.final
|
||||
def deploy_for_cache(self, level: types.services.CacheLevel) -> types.states.TaskState:
|
||||
"""
|
||||
Fixed Userservice does not provided "cached" elements
|
||||
"""
|
||||
return self._error('Cache not supported')
|
||||
|
||||
@typing.final
|
||||
def assign(self, vmid: str) -> types.states.TaskState:
|
||||
|
@ -39,7 +39,6 @@ from uds.core import types
|
||||
from uds.core.services.specializations.dynamic_machine.dynamic_userservice import DynamicUserService, Operation
|
||||
from uds.core.managers.userservice import UserServiceManager
|
||||
from uds.core.util import autoserializable
|
||||
from uds.core.util.model import sql_stamp_seconds
|
||||
|
||||
from . import client
|
||||
|
||||
@ -185,376 +184,6 @@ class ProxmoxUserserviceLinked(DynamicUserService, autoserializable.AutoSerializ
|
||||
def op_shutdown(self) -> None:
|
||||
return super().op_shutdown()
|
||||
|
||||
def _shutdown_machine(self) -> None:
|
||||
"""
|
||||
Tries to stop machine using qemu guest tools
|
||||
If it takes too long to stop, or qemu guest tools are not installed,
|
||||
will use "power off" "a las bravas"
|
||||
"""
|
||||
self._task = ''
|
||||
shutdown = -1 # Means machine already stopped
|
||||
try:
|
||||
vm_info = self.service().get_machine_info(int(self._vmid))
|
||||
except client.ProxmoxConnectionError:
|
||||
self._retry_again()
|
||||
return
|
||||
except Exception as e:
|
||||
raise Exception('Machine not found on stop machine') from e
|
||||
|
||||
if vm_info.status != 'stopped':
|
||||
self._store_task(self.service().provider().shutdown_machine(int(self._vmid)))
|
||||
shutdown = sql_stamp_seconds()
|
||||
logger.debug('Stoped vm using guest tools')
|
||||
|
||||
self.storage.save_pickled('shutdown', shutdown)
|
||||
|
||||
# def _create(self) -> None:
|
||||
# """
|
||||
# Deploys a machine from template for user/cache
|
||||
# """
|
||||
# template_id = self.publication().machine()
|
||||
# name = self.get_name()
|
||||
# if name == consts.NO_MORE_NAMES:
|
||||
# raise Exception(
|
||||
# 'No more names available for this service. (Increase digits for this service to fix)'
|
||||
# )
|
||||
|
||||
# comments = 'UDS Linked clone'
|
||||
|
||||
# task_result = self.service().clone_machine(name, comments, template_id)
|
||||
|
||||
# self._store_task(task_result.upid)
|
||||
|
||||
# self._vmid = str(task_result.vmid)
|
||||
|
||||
# def _remove(self) -> None:
|
||||
# """
|
||||
# Removes a machine from system
|
||||
# """
|
||||
# try:
|
||||
# vm_info = self.service().get_machine_info(int(self._vmid))
|
||||
# except Exception as e:
|
||||
# raise Exception('Machine not found on remove machine') from e
|
||||
|
||||
# if vm_info.status != 'stopped':
|
||||
# logger.debug('Info status: %s', vm_info)
|
||||
# self._queue = [Operation.STOP, Operation.REMOVE, Operation.FINISH]
|
||||
# self._execute_queue()
|
||||
# self._store_task(self.service().remove_machine(int(self._vmid)))
|
||||
|
||||
# def _start_machine(self) -> None:
|
||||
# try:
|
||||
# vm_info = self.service().get_machine_info(int(self._vmid))
|
||||
# except client.ProxmoxConnectionError:
|
||||
# self._retry_later()
|
||||
# return
|
||||
# except Exception as e:
|
||||
# raise Exception('Machine not found on start machine') from e
|
||||
|
||||
# if vm_info.status == 'stopped':
|
||||
# self._store_task(self.service().provider().start_machine(int(self._vmid)))
|
||||
|
||||
# def _stop_machine(self) -> None:
|
||||
# try:
|
||||
# vm_info = self.service().get_machine_info(int(self._vmid))
|
||||
# except client.ProxmoxConnectionError:
|
||||
# self._retry_later()
|
||||
# return
|
||||
# except Exception as e:
|
||||
# raise Exception('Machine not found on stop machine') from e
|
||||
|
||||
# if vm_info.status != 'stopped':
|
||||
# logger.debug('Stopping machine %s', vm_info)
|
||||
# self._store_task(self.service().provider().stop_machine(int(self._vmid)))
|
||||
|
||||
# def _shutdown_machine(self) -> None:
|
||||
# try:
|
||||
# vm_info = self.service().get_machine_info(int(self._vmid))
|
||||
# except client.ProxmoxConnectionError:
|
||||
# self._retry_later()
|
||||
# return
|
||||
# except Exception as e:
|
||||
# raise Exception('Machine not found or suspended machine') from e
|
||||
|
||||
# if vm_info.status != 'stopped':
|
||||
# self._store_task(self.service().provider().shutdown_machine(int(self._vmid)))
|
||||
|
||||
# def _gracely_stop(self) -> None:
|
||||
# """
|
||||
# Tries to stop machine using qemu guest tools
|
||||
# If it takes too long to stop, or qemu guest tools are not installed,
|
||||
# will use "power off" "a las bravas"
|
||||
# """
|
||||
# self._task = ''
|
||||
# shutdown = -1 # Means machine already stopped
|
||||
# try:
|
||||
# vm_info = self.service().get_machine_info(int(self._vmid))
|
||||
# except client.ProxmoxConnectionError:
|
||||
# self._retry_later()
|
||||
# return
|
||||
# except Exception as e:
|
||||
# raise Exception('Machine not found on stop machine') from e
|
||||
|
||||
# if vm_info.status != 'stopped':
|
||||
# self._store_task(self.service().provider().shutdown_machine(int(self._vmid)))
|
||||
# shutdown = sql_stamp_seconds()
|
||||
# logger.debug('Stoped vm using guest tools')
|
||||
# self.storage.save_pickled('shutdown', shutdown)
|
||||
|
||||
# def _update_machine_mac_and_ha(self) -> None:
|
||||
# try:
|
||||
# # Note: service will only enable ha if it is configured to do so
|
||||
# self.service().enable_machine_ha(int(self._vmid), True) # Enable HA before continuing here
|
||||
|
||||
# # Set vm mac address now on first interface
|
||||
# self.service().provider().set_machine_mac(int(self._vmid), self.get_unique_id())
|
||||
# except client.ProxmoxConnectionError:
|
||||
# self._retry_later()
|
||||
# return
|
||||
# except Exception as e:
|
||||
# logger.exception('Setting HA and MAC on proxmox')
|
||||
# raise Exception(f'Error setting MAC and HA on proxmox: {e}') from e
|
||||
|
||||
# # Check methods
|
||||
# def _check_task_finished(self) -> types.states.TaskState:
|
||||
# if self._task == '':
|
||||
# return types.states.TaskState.FINISHED
|
||||
|
||||
# node, upid = self._retrieve_task()
|
||||
|
||||
# try:
|
||||
# task = self.service().provider().get_task_info(node, upid)
|
||||
# except client.ProxmoxConnectionError:
|
||||
# return types.states.TaskState.RUNNING # Try again later
|
||||
|
||||
# if task.is_errored():
|
||||
# return self._error(task.exitstatus)
|
||||
|
||||
# if task.is_completed():
|
||||
# return types.states.TaskState.FINISHED
|
||||
|
||||
# return types.states.TaskState.RUNNING
|
||||
|
||||
# def _create_checker(self) -> types.states.TaskState:
|
||||
# """
|
||||
# Checks the state of a deploy for an user or cache
|
||||
# """
|
||||
# return self._check_task_finished()
|
||||
|
||||
# def _start_checker(self) -> types.states.TaskState:
|
||||
# """
|
||||
# Checks if machine has started
|
||||
# """
|
||||
# return self._check_task_finished()
|
||||
|
||||
# def _stop_checker(self) -> types.states.TaskState:
|
||||
# """
|
||||
# Checks if machine has stoped
|
||||
# """
|
||||
# return self._check_task_finished()
|
||||
|
||||
# def _shutdown_checker(self) -> types.states.TaskState:
|
||||
# """
|
||||
# Check if the machine has suspended
|
||||
# """
|
||||
# return self._check_task_finished()
|
||||
|
||||
# def _graceful_stop_checker(self) -> types.states.TaskState:
|
||||
# """
|
||||
# Check if the machine has gracely stopped (timed shutdown)
|
||||
# """
|
||||
# shutdown_start = self.storage.read_pickled('shutdown')
|
||||
# logger.debug('Shutdown start: %s', shutdown_start)
|
||||
# if shutdown_start < 0: # Was already stopped
|
||||
# # Machine is already stop
|
||||
# logger.debug('Machine WAS stopped')
|
||||
# return types.states.TaskState.FINISHED
|
||||
|
||||
# if shutdown_start == 0: # Was shut down a las bravas
|
||||
# logger.debug('Macine DO NOT HAVE guest tools')
|
||||
# return self._stop_checker()
|
||||
|
||||
# logger.debug('Checking State')
|
||||
# # Check if machine is already stopped
|
||||
# if self.service().get_machine_info(int(self._vmid)).status == 'stopped':
|
||||
# return types.states.TaskState.FINISHED # It's stopped
|
||||
|
||||
# logger.debug('State is running')
|
||||
# if sql_stamp_seconds() - shutdown_start > consts.os.MAX_GUEST_SHUTDOWN_WAIT:
|
||||
# logger.debug('Time is consumed, falling back to stop')
|
||||
# self.do_log(
|
||||
# log.LogLevel.ERROR,
|
||||
# f'Could not shutdown machine using soft power off in time ({consts.os.MAX_GUEST_SHUTDOWN_WAIT} seconds). Powering off.',
|
||||
# )
|
||||
# # Not stopped by guest in time, but must be stopped normally
|
||||
# self.storage.save_pickled('shutdown', 0)
|
||||
# self._stop_machine() # Launch "hard" stop
|
||||
|
||||
# return types.states.TaskState.RUNNING
|
||||
|
||||
# def _remove_checker(self) -> types.states.TaskState:
|
||||
# """
|
||||
# Checks if a machine has been removed
|
||||
# """
|
||||
# return self._check_task_finished()
|
||||
|
||||
# def _mac_checker(self) -> types.states.TaskState:
|
||||
# """
|
||||
# Checks if change mac operation has finished.
|
||||
|
||||
# Changing nic configuration is 1-step operation, so when we check it here, it is already done
|
||||
# """
|
||||
# return types.states.TaskState.FINISHED
|
||||
|
||||
# def check_state(self) -> types.states.TaskState:
|
||||
# """
|
||||
# Check what operation is going on, and acts acordly to it
|
||||
# """
|
||||
# self._debug('check_state')
|
||||
# op = self._get_current_op()
|
||||
|
||||
# if op == Operation.ERROR:
|
||||
# return types.states.TaskState.ERROR
|
||||
|
||||
# if op == Operation.FINISH:
|
||||
# return types.states.TaskState.FINISHED
|
||||
|
||||
# try:
|
||||
# operation_checker = _CHECKERS.get(op, None)
|
||||
|
||||
# if operation_checker is None:
|
||||
# return self._error(f'Unknown operation found at check queue ({op})')
|
||||
|
||||
# state = operation_checker(self)
|
||||
# if state == types.states.TaskState.FINISHED:
|
||||
# self._pop_current_op() # Remove runing op
|
||||
# return self._execute_queue()
|
||||
|
||||
# return state
|
||||
# except Exception as e:
|
||||
# return self._error(e)
|
||||
|
||||
# def move_to_cache(self, level: types.services.CacheLevel) -> types.states.TaskState:
|
||||
# """
|
||||
# Moves machines between cache levels
|
||||
# """
|
||||
# if Operation.REMOVE in self._queue:
|
||||
# return types.states.TaskState.RUNNING
|
||||
|
||||
# if level == types.services.CacheLevel.L1:
|
||||
# self._queue = [Operation.START, Operation.FINISH]
|
||||
# else:
|
||||
# self._queue = [Operation.START, Operation.SHUTDOWN, Operation.FINISH]
|
||||
|
||||
# return self._execute_queue()
|
||||
|
||||
# def error_reason(self) -> str:
|
||||
# """
|
||||
# Returns the reason of the error.
|
||||
|
||||
# Remember that the class is responsible of returning this whenever asked
|
||||
# for it, and it will be asked everytime it's needed to be shown to the
|
||||
# user (when the administation asks for it).
|
||||
# """
|
||||
# return self._reason
|
||||
|
||||
# def destroy(self) -> types.states.TaskState:
|
||||
# """
|
||||
# Invoked for destroying a deployed service
|
||||
# """
|
||||
# self._debug('destroy')
|
||||
# if self._vmid == '':
|
||||
# self._queue = []
|
||||
# self._reason = "canceled"
|
||||
# return types.states.TaskState.FINISHED
|
||||
|
||||
# # If executing something, wait until finished to remove it
|
||||
# # We simply replace the execution queue
|
||||
# op = self._get_current_op()
|
||||
|
||||
# if op == Operation.ERROR:
|
||||
# return self._error('Machine is already in error state!')
|
||||
|
||||
# lst: list[Operation] = [] if not self.service().try_graceful_shutdown() else [Operation.GRACEFUL_STOP]
|
||||
# queue = lst + [Operation.STOP, Operation.REMOVE, Operation.FINISH]
|
||||
|
||||
# if op in (Operation.FINISH, Operation.WAIT):
|
||||
# self._queue[:] = queue
|
||||
# return self._execute_queue()
|
||||
|
||||
# self._queue = [op] + queue
|
||||
# # Do not execute anything.here, just continue normally
|
||||
# return types.states.TaskState.RUNNING
|
||||
|
||||
# def cancel(self) -> types.states.TaskState:
|
||||
# """
|
||||
# This is a task method. As that, the excepted return values are
|
||||
# State values RUNNING, FINISHED or ERROR.
|
||||
|
||||
# This can be invoked directly by an administration or by the clean up
|
||||
# of the deployed service (indirectly).
|
||||
# When administrator requests it, the cancel is "delayed" and not
|
||||
# invoked directly.
|
||||
# """
|
||||
# return self.destroy()
|
||||
|
||||
# @staticmethod
|
||||
# def _op2str(op: Operation) -> str:
|
||||
# return {
|
||||
# Operation.CREATE: 'create',
|
||||
# Operation.START: 'start',
|
||||
# Operation.STOP: 'stop',
|
||||
# Operation.SHUTDOWN: 'shutdown',
|
||||
# Operation.GRACEFUL_STOP: 'gracely stop',
|
||||
# Operation.REMOVE: 'remove',
|
||||
# Operation.WAIT: 'wait',
|
||||
# Operation.ERROR: 'error',
|
||||
# Operation.FINISH: 'finish',
|
||||
# Operation.RETRY: 'retry',
|
||||
# Operation.GET_MAC: 'getting mac',
|
||||
# }.get(op, '????')
|
||||
|
||||
# def _debug(self, txt: str) -> None:
|
||||
# logger.debug(
|
||||
# 'State at %s: name: %s, ip: %s, mac: %s, vmid:%s, queue: %s',
|
||||
# txt,
|
||||
# self._name,
|
||||
# self._ip,
|
||||
# self._mac,
|
||||
# self._vmid,
|
||||
# [ProxmoxUserserviceLinked._op2str(op) for op in self._queue],
|
||||
# )
|
||||
|
||||
# _EXECUTORS: typing.Final[
|
||||
# collections.abc.Mapping[
|
||||
# Operation, typing.Optional[collections.abc.Callable[[ProxmoxUserserviceLinked], None]]
|
||||
# ]
|
||||
# ] = {
|
||||
# Operation.CREATE: ProxmoxUserserviceLinked._create,
|
||||
# Operation.RETRY: ProxmoxUserserviceLinked._retry,
|
||||
# Operation.START: ProxmoxUserserviceLinked._start_machine,
|
||||
# Operation.STOP: ProxmoxUserserviceLinked._stop_machine,
|
||||
# Operation.GRACEFUL_STOP: ProxmoxUserserviceLinked._gracely_stop,
|
||||
# Operation.SHUTDOWN: ProxmoxUserserviceLinked._shutdown_machine,
|
||||
# Operation.WAIT: ProxmoxUserserviceLinked._wait,
|
||||
# Operation.REMOVE: ProxmoxUserserviceLinked._remove,
|
||||
# Operation.GET_MAC: ProxmoxUserserviceLinked._update_machine_mac_and_ha,
|
||||
# }
|
||||
|
||||
# _CHECKERS: dict[
|
||||
# Operation, typing.Optional[collections.abc.Callable[[ProxmoxUserserviceLinked], types.states.TaskState]]
|
||||
# ] = {
|
||||
# Operation.CREATE: ProxmoxUserserviceLinked._create_checker,
|
||||
# Operation.RETRY: ProxmoxUserserviceLinked._retry_checker,
|
||||
# Operation.WAIT: ProxmoxUserserviceLinked._wait_checker,
|
||||
# Operation.START: ProxmoxUserserviceLinked._start_checker,
|
||||
# Operation.STOP: ProxmoxUserserviceLinked._stop_checker,
|
||||
# Operation.GRACEFUL_STOP: ProxmoxUserserviceLinked._graceful_stop_checker,
|
||||
# Operation.SHUTDOWN: ProxmoxUserserviceLinked._shutdown_checker,
|
||||
# Operation.REMOVE: ProxmoxUserserviceLinked._remove_checker,
|
||||
# Operation.GET_MAC: ProxmoxUserserviceLinked._mac_checker,
|
||||
# }
|
||||
|
||||
def get_console_connection(
|
||||
self,
|
||||
) -> typing.Optional[types.services.ConsoleConnectionInfo]:
|
||||
|
@ -65,7 +65,7 @@ class ProxmoxDeferredRemoval(jobs.Job):
|
||||
|
||||
@staticmethod
|
||||
def remove(provider_instance: 'provider.ProxmoxProvider', vmid: int, try_graceful_shutdown: bool) -> None:
|
||||
def storeDeferredRemoval() -> None:
|
||||
def store_for_deferred_removal() -> None:
|
||||
provider_instance.storage.save_to_db('tr' + str(vmid), f'{vmid}:{"y" if try_graceful_shutdown else "n"}', attr1='tRm')
|
||||
ProxmoxDeferredRemoval.counter += 1
|
||||
logger.debug('Adding %s from %s to defeffed removal process', vmid, provider_instance)
|
||||
@ -81,16 +81,16 @@ class ProxmoxDeferredRemoval(jobs.Job):
|
||||
# If running vm, simply stops it and wait for next
|
||||
provider_instance.stop_machine(vmid)
|
||||
|
||||
storeDeferredRemoval()
|
||||
store_for_deferred_removal()
|
||||
return
|
||||
|
||||
provider_instance.remove_machine(vmid) # Try to remove, launch removal, but check later
|
||||
storeDeferredRemoval()
|
||||
store_for_deferred_removal()
|
||||
|
||||
except client.ProxmoxNotFound:
|
||||
return # Machine does not exists
|
||||
except Exception as e:
|
||||
storeDeferredRemoval()
|
||||
store_for_deferred_removal()
|
||||
logger.info(
|
||||
'Machine %s could not be removed right now, queued for later: %s',
|
||||
vmid,
|
||||
|
@ -130,11 +130,5 @@ class ProxmoxPublication(DynamicPublication, autoserializable.AutoSerializable):
|
||||
def op_remove(self) -> None:
|
||||
self.service().remove_machine(self, self._vmid)
|
||||
|
||||
def cancel(self) -> types.states.TaskState:
|
||||
return self.destroy()
|
||||
|
||||
def error_reason(self) -> str:
|
||||
return self._reason
|
||||
|
||||
def machine(self) -> int:
|
||||
return int(self._vmid)
|
||||
|
@ -288,10 +288,12 @@ class ProxmoxServiceLinked(DynamicService):
|
||||
return self.get_nic_mac(int(machine_id))
|
||||
|
||||
def start_machine(self, caller_instance: 'DynamicUserService | DynamicPublication', machine_id: str) -> None:
|
||||
self.provider().start_machine(int(machine_id))
|
||||
if not self.is_machine_running(caller_instance, machine_id):
|
||||
self.provider().start_machine(int(machine_id))
|
||||
|
||||
def stop_machine(self, caller_instance: 'DynamicUserService | DynamicPublication', machine_id: str) -> None:
|
||||
self.provider().stop_machine(int(machine_id))
|
||||
if self.is_machine_running(caller_instance, machine_id):
|
||||
self.provider().stop_machine(int(machine_id))
|
||||
|
||||
def is_machine_running(
|
||||
self, caller_instance: 'DynamicUserService | DynamicPublication', machine_id: str
|
||||
@ -304,5 +306,4 @@ class ProxmoxServiceLinked(DynamicService):
|
||||
def remove_machine(self, caller_instance: 'DynamicUserService | DynamicPublication', machine_id: str) -> None:
|
||||
# All removals are deferred, so we can do it async
|
||||
# Try to stop it if already running... Hard stop
|
||||
self.stop_machine(caller_instance, machine_id)
|
||||
jobs.ProxmoxDeferredRemoval.remove(self.provider(), int(machine_id), self.try_graceful_shutdown())
|
||||
|
@ -402,8 +402,10 @@ SERVICE_LINKED_VALUES_DICT: typing.Final[gui.ValuesDictType] = {
|
||||
'gpu': VGPUS[0].type,
|
||||
'basename': 'base',
|
||||
'lenname': 4,
|
||||
'prov_uuid': '',
|
||||
}
|
||||
|
||||
|
||||
SERVICE_FIXED_VALUES_DICT: gui.ValuesDictType = {
|
||||
'token': '',
|
||||
'pool': POOLS[0].poolid,
|
||||
|
@ -36,19 +36,25 @@ from uds.core import types
|
||||
|
||||
from . import fixtures
|
||||
|
||||
from ...utils.test import UDSTestCase
|
||||
from ...utils.test import UDSTransactionTestCase
|
||||
from ...utils import MustBeOfType
|
||||
from ...utils.generators import limited_iterator
|
||||
|
||||
|
||||
class TestProxmovPublication(UDSTestCase):
|
||||
# USe transactional, used by publication access to db on "removal"
|
||||
class TestProxmovPublication(UDSTransactionTestCase):
|
||||
|
||||
def test_publication(self) -> None:
|
||||
with fixtures.patch_provider_api() as api:
|
||||
publication = fixtures.create_publication()
|
||||
|
||||
state = publication.publish()
|
||||
# Wait until types.services.Operation.CREATE_COMPLETED
|
||||
for _ in limited_iterator(lambda: publication._queue[0] != types.services.Operation.CREATE_COMPLETED, 10):
|
||||
state = publication.check_state()
|
||||
|
||||
self.assertEqual(state, types.states.State.RUNNING)
|
||||
api.clone_machine.assert_called_with(
|
||||
api.clone_machine.assert_called_once_with(
|
||||
publication.service().machine.as_int(),
|
||||
MustBeOfType(int),
|
||||
MustBeOfType(str),
|
||||
@ -59,61 +65,77 @@ class TestProxmovPublication(UDSTestCase):
|
||||
publication.service().pool.value,
|
||||
None,
|
||||
)
|
||||
running_task = fixtures.TASK_STATUS._replace(status='running')
|
||||
|
||||
api.get_task.return_value = running_task
|
||||
state = publication.check_state()
|
||||
self.assertEqual(state, types.states.State.RUNNING)
|
||||
# Now ensure task is finished
|
||||
api.get_task.return_value = fixtures.TASK_STATUS._replace(status='stopped', exitstatus='OK')
|
||||
# And should end in next call
|
||||
self.assertEqual(publication.check_state(), types.states.State.FINISHED)
|
||||
# Now, error
|
||||
publication._state = types.states.State.RUNNING
|
||||
# Must have vmid, and must match machine() result
|
||||
self.assertEqual(publication.machine(), int(publication._vmid))
|
||||
|
||||
|
||||
def test_publication_error(self) -> None:
|
||||
with fixtures.patch_provider_api() as api:
|
||||
publication = fixtures.create_publication()
|
||||
|
||||
# Ensure state check returns error
|
||||
api.get_task.return_value = fixtures.TASK_STATUS._replace(
|
||||
status='stopped', exitstatus='ERROR, BOOM!'
|
||||
)
|
||||
self.assertEqual(publication.check_state(), types.states.State.ERROR)
|
||||
|
||||
state = publication.publish()
|
||||
self.assertEqual(state, types.states.State.RUNNING, f'State is not running: publication._queue={publication._queue}')
|
||||
|
||||
# Wait until types.services.Operation.CREATE_COMPLETED
|
||||
for _ in limited_iterator(lambda: state == types.states.TaskState.RUNNING, 128):
|
||||
state = publication.check_state()
|
||||
|
||||
try:
|
||||
api.clone_machine.assert_called_once_with(
|
||||
publication.service().machine.as_int(),
|
||||
MustBeOfType(int),
|
||||
MustBeOfType(str),
|
||||
MustBeOfType(str),
|
||||
False,
|
||||
None,
|
||||
publication.service().datastore.value,
|
||||
publication.service().pool.value,
|
||||
None,
|
||||
)
|
||||
except AssertionError:
|
||||
self.fail(f'Clone machine not called: {api.mock_calls} // {publication._queue}')
|
||||
self.assertEqual(state, types.states.State.ERROR)
|
||||
self.assertEqual(publication.error_reason(), 'ERROR, BOOM!')
|
||||
|
||||
publication._vmid = str(fixtures.VMS_INFO[0].vmid)
|
||||
self.assertEqual(publication.machine(), fixtures.VMS_INFO[0].vmid)
|
||||
|
||||
def test_publication_destroy(self) -> None:
|
||||
vmid = str(fixtures.VMS_INFO[0].vmid)
|
||||
with fixtures.patch_provider_api() as api:
|
||||
publication = fixtures.create_publication()
|
||||
|
||||
# Destroy
|
||||
publication._state = types.states.State.RUNNING
|
||||
publication._vmid = vmid
|
||||
state = publication.destroy()
|
||||
self.assertEqual(state, types.states.State.RUNNING)
|
||||
self.assertEqual(publication._destroy_after, True)
|
||||
api.remove_machine.assert_called_once_with(publication.machine())
|
||||
|
||||
# Now, destroy again
|
||||
# Now, destroy again, should do nothing more
|
||||
state = publication.destroy()
|
||||
publication._vmid = vmid
|
||||
self.assertEqual(state, types.states.State.RUNNING)
|
||||
self.assertEqual(publication._destroy_after, False)
|
||||
self.assertEqual(publication._operation, 'd')
|
||||
self.assertEqual(publication._state, types.states.State.RUNNING)
|
||||
api.remove_machine.assert_called_with(publication.service().machine.as_int())
|
||||
# Should not call again
|
||||
api.remove_machine.assert_called_once_with(publication.machine())
|
||||
|
||||
# Now, repeat with finished state at the very beginning
|
||||
api.remove_machine.reset_mock()
|
||||
publication._state = types.states.State.FINISHED
|
||||
self.assertEqual(state, types.states.State.RUNNING)
|
||||
|
||||
|
||||
def test_publication_destroy_error(self) -> None:
|
||||
vmid = str(fixtures.VMS_INFO[0].vmid)
|
||||
with fixtures.patch_provider_api() as api:
|
||||
publication = fixtures.create_publication()
|
||||
|
||||
# Now, destroy in fact will not return error, because it will
|
||||
# queue the operation if failed, but api.remove_machine will be called anyway
|
||||
publication._vmid = vmid
|
||||
api.remove_machine.side_effect = Exception('BOOM!')
|
||||
publication._vmid = vmid
|
||||
self.assertEqual(publication.destroy(), types.states.State.RUNNING)
|
||||
self.assertEqual(publication._destroy_after, False)
|
||||
self.assertEqual(publication._operation, 'd')
|
||||
self.assertEqual(publication._state, types.states.State.RUNNING)
|
||||
api.remove_machine.assert_called_with(publication.service().machine.as_int())
|
||||
|
||||
# And now, with error
|
||||
api.remove_machine.side_effect = Exception('BOOM!')
|
||||
publication._state = types.states.State.FINISHED
|
||||
publication._vmid = vmid
|
||||
self.assertEqual(publication.destroy(), types.states.State.ERROR)
|
||||
self.assertEqual(publication.error_reason(), 'BOOM!')
|
||||
api.remove_machine.assert_called_once_with(publication.machine())
|
||||
|
||||
# Ensure cancel calls destroy
|
||||
with mock.patch.object(publication, 'destroy') as destroy:
|
||||
|
@ -30,12 +30,12 @@
|
||||
"""
|
||||
Author: Adolfo Gómez, dkmaster at dkmon dot com
|
||||
"""
|
||||
import pickle
|
||||
import typing
|
||||
|
||||
from tests.utils.test import UDSTestCase
|
||||
|
||||
from uds.core import types
|
||||
from uds.core.environment import Environment
|
||||
from uds.core.util import autoserializable
|
||||
|
||||
from uds.services.Proxmox.publication import ProxmoxPublication as Publication
|
||||
|
||||
@ -57,20 +57,19 @@ from uds.services.Proxmox.publication import ProxmoxPublication as Publication
|
||||
# ) = vals[1:]
|
||||
# else:
|
||||
# raise ValueError('Invalid data format')
|
||||
|
||||
|
||||
# self._destroy_after = destroy_after != ''
|
||||
EXPECTED_FIELDS: typing.Final[set[str]] = {
|
||||
'_name',
|
||||
'_vmid',
|
||||
'_task',
|
||||
'_state',
|
||||
'_operation',
|
||||
'_destroy_after',
|
||||
'_queue',
|
||||
'_is_flagged_for_destroy',
|
||||
'_reason',
|
||||
}
|
||||
|
||||
|
||||
SERIALIZED_PUBLICATION_DATA: typing.Final[bytes] = b'v1\tname\tvm\ttask\tstate\toperation\ty\treason'
|
||||
SERIALIZED_PUBLICATION_DATA: typing.Final[bytes] = b'v1\tname\tvm\ttask\tstate\td\ty\treason'
|
||||
|
||||
|
||||
class ProxmoxPublicationSerializationTest(UDSTestCase):
|
||||
@ -78,9 +77,18 @@ class ProxmoxPublicationSerializationTest(UDSTestCase):
|
||||
self.assertEqual(instance._name, 'name')
|
||||
self.assertEqual(instance._vmid, 'vm')
|
||||
self.assertEqual(instance._task, 'task')
|
||||
self.assertEqual(instance._state, 'state')
|
||||
self.assertEqual(instance._operation, 'operation')
|
||||
self.assertTrue(instance._destroy_after)
|
||||
# State is not used anymore on current publication, (it's the current queue top operation)
|
||||
# self.assertEqual(instance._state, 'state')
|
||||
self.assertEqual(
|
||||
instance._queue,
|
||||
[
|
||||
types.services.Operation.REMOVE,
|
||||
types.services.Operation.REMOVE_COMPLETED,
|
||||
types.services.Operation.FINISH,
|
||||
],
|
||||
)
|
||||
|
||||
self.assertTrue(instance._is_flagged_for_destroy)
|
||||
self.assertEqual(instance._reason, 'reason')
|
||||
|
||||
def test_marshaling(self) -> None:
|
||||
|
@ -112,8 +112,8 @@ class TestProxmovLinkedService(UDSTestCase):
|
||||
# Get nic mac
|
||||
self.assertEqual(service.get_nic_mac(1), '00:01:02:03:04:05')
|
||||
|
||||
# remove machine
|
||||
self.assertEqual(service.remove_machine(1), fixtures.UPID)
|
||||
# remove machine, but this is from provider
|
||||
self.assertEqual(service.provider().remove_machine(1), fixtures.UPID)
|
||||
|
||||
# Enable HA
|
||||
service.enable_machine_ha(1, True)
|
||||
|
Loading…
x
Reference in New Issue
Block a user