1
0
mirror of https://github.com/dkmstr/openuds.git synced 2025-01-22 22:03:54 +03:00

Fixing dynamic service specializations and tests

This commit is contained in:
Adolfo Gómez García 2024-03-22 01:08:17 +01:00
parent 165d3bde21
commit ccce6650ba
No known key found for this signature in database
GPG Key ID: DD1ABF20724CDA23
8 changed files with 263 additions and 145 deletions

View File

@ -146,22 +146,6 @@ class DynamicService(services.Service, abc.ABC): # pylint: disable=too-many-pub
""" """
... ...
def is_machine_stopped(
self, caller_instance: 'DynamicUserService | DynamicPublication', machine_id: str
) -> bool:
"""
Returns if the machine is stopped
"""
return not self.is_machine_running(caller_instance, machine_id)
def is_machine_suspended(
self, caller_instance: 'DynamicUserService | DynamicPublication', machine_id: str
) -> bool:
"""
Returns if the machine is suspended
"""
return self.is_machine_stopped(caller_instance, machine_id)
@abc.abstractmethod @abc.abstractmethod
def start_machine( def start_machine(
self, caller_instance: 'DynamicUserService | DynamicPublication', machine_id: str self, caller_instance: 'DynamicUserService | DynamicPublication', machine_id: str

View File

@ -31,6 +31,7 @@
Author: Adolfo Gómez, dkmaster at dkmon dot com Author: Adolfo Gómez, dkmaster at dkmon dot com
""" """
import abc import abc
import functools
import logging import logging
import typing import typing
import collections.abc import collections.abc
@ -47,14 +48,24 @@ if typing.TYPE_CHECKING:
logger = logging.getLogger(__name__) logger = logging.getLogger(__name__)
# Decorator that tests that _vmid is not empty
# Used by some default methods that require a vmid to work
def must_have_vmid(fnc: typing.Callable[[typing.Any], None]) -> typing.Callable[['DynamicUserService'], None]:
@functools.wraps(fnc)
def wrapper(self: 'DynamicUserService') -> None:
if self._vmid == '':
raise Exception(f'No machine id on {self._name} for {fnc}')
return fnc(self)
return wrapper
class DynamicUserService(services.UserService, autoserializable.AutoSerializable, abc.ABC): class DynamicUserService(services.UserService, autoserializable.AutoSerializable, abc.ABC):
""" """
This class represents a fixed user service, that is, a service that is assigned to an user This class represents a fixed user service, that is, a service that is assigned to an user
and that will be always the from a "fixed" machine, that is, a machine that is not created. and that will be always the from a "fixed" machine, that is, a machine that is not created.
""" """
suggested_delay = 8
suggested_delay = 5
# Some customization fields # Some customization fields
# If ip can be manually overriden # If ip can be manually overriden
@ -68,6 +79,7 @@ class DynamicUserService(services.UserService, autoserializable.AutoSerializable
_name = autoserializable.StringField(default='') _name = autoserializable.StringField(default='')
_mac = autoserializable.StringField(default='') _mac = autoserializable.StringField(default='')
_ip = autoserializable.StringField(default='')
_vmid = autoserializable.StringField(default='') _vmid = autoserializable.StringField(default='')
_reason = autoserializable.StringField(default='') _reason = autoserializable.StringField(default='')
_queue = autoserializable.ListField[Operation]() # Default is empty list _queue = autoserializable.ListField[Operation]() # Default is empty list
@ -75,6 +87,9 @@ class DynamicUserService(services.UserService, autoserializable.AutoSerializable
# In order to allow migrating from old data, we will mark if the _queue has our format or the old one # In order to allow migrating from old data, we will mark if the _queue has our format or the old one
_queue_has_new_format = autoserializable.BoolField(default=False) _queue_has_new_format = autoserializable.BoolField(default=False)
# Extra info, not serializable, to keep information in case of exception and debug it
_error_debug_info: typing.Optional[str] = None
# Note that even if SNAPHSHOT operations are in middel # Note that even if SNAPHSHOT operations are in middel
# implementations may opt to no have snapshots at all # implementations may opt to no have snapshots at all
# In this case, the process_snapshot method will do nothing # In this case, the process_snapshot method will do nothing
@ -130,9 +145,9 @@ class DynamicUserService(services.UserService, autoserializable.AutoSerializable
def _current_op(self) -> Operation: def _current_op(self) -> Operation:
""" """
Get the current operation from the queue Get the current operation from the queue
Checks that the queue is upgraded, and if not, migrates it Checks that the queue is upgraded, and if not, migrates it
Note: Note:
This method will be here for a while, to ensure future compat with old data. This method will be here for a while, to ensure future compat with old data.
Eventually, this mechanincs will be removed, but no date is set for that. Eventually, this mechanincs will be removed, but no date is set for that.
There is almos not penalty on keeping this here, as it's only an small check There is almos not penalty on keeping this here, as it's only an small check
@ -141,12 +156,12 @@ class DynamicUserService(services.UserService, autoserializable.AutoSerializable
if self._queue_has_new_format is False: if self._queue_has_new_format is False:
self.migrate_old_queue() self.migrate_old_queue()
self._queue_has_new_format = True self._queue_has_new_format = True
if not self._queue: if not self._queue:
return Operation.FINISH return Operation.FINISH
return self._queue[0] return self._queue[0]
def _set_queue(self, queue: list[Operation]) -> None: def _set_queue(self, queue: list[Operation]) -> None:
""" """
Sets the queue of tasks to be executed Sets the queue of tasks to be executed
@ -154,7 +169,7 @@ class DynamicUserService(services.UserService, autoserializable.AutoSerializable
""" """
self._queue = queue self._queue = queue
self._queue_has_new_format = True self._queue_has_new_format = True
def _retry_again(self) -> types.states.TaskState: def _retry_again(self) -> types.states.TaskState:
""" """
Retries the current operation Retries the current operation
@ -181,6 +196,7 @@ class DynamicUserService(services.UserService, autoserializable.AutoSerializable
Returns: Returns:
State.ERROR, so we can do "return self._error(reason)" State.ERROR, so we can do "return self._error(reason)"
""" """
self._error_debug_info = self._debug(repr(reason))
reason = str(reason) reason = str(reason)
logger.debug('Setting error state, reason: %s', reason) logger.debug('Setting error state, reason: %s', reason)
self.do_log(log.LogLevel.ERROR, reason) self.do_log(log.LogLevel.ERROR, reason)
@ -305,7 +321,7 @@ class DynamicUserService(services.UserService, autoserializable.AutoSerializable
return types.states.TaskState.RUNNING return types.states.TaskState.RUNNING
except Exception as e: except Exception as e:
logger.exception('Unexpected FixedUserService exception: %s', e) logger.exception('Unexpected FixedUserService exception: %s', e)
return self._error(str(e)) return self._error(e)
def check_state(self) -> types.states.TaskState: def check_state(self) -> types.states.TaskState:
""" """
@ -336,7 +352,8 @@ class DynamicUserService(services.UserService, autoserializable.AutoSerializable
if op.is_custom(): if op.is_custom():
state = self.op_custom_checker(op) state = self.op_custom_checker(op)
else: else:
state = _CHECKERS[op](self) operation_checker = _CHECKERS[op]
state = getattr(self, operation_checker.__name__)()
if state == types.states.TaskState.FINISHED: if state == types.states.TaskState.FINISHED:
# Remove finished operation from queue # Remove finished operation from queue
@ -346,7 +363,7 @@ class DynamicUserService(services.UserService, autoserializable.AutoSerializable
return state return state
except Exception as e: except Exception as e:
return self._error(e) return self._error(e)
@typing.final @typing.final
def destroy(self) -> types.states.TaskState: def destroy(self) -> types.states.TaskState:
""" """
@ -358,8 +375,14 @@ class DynamicUserService(services.UserService, autoserializable.AutoSerializable
if op == Operation.ERROR: if op == Operation.ERROR:
return self._error('Machine is already in error state!') return self._error('Machine is already in error state!')
shutdown_operations: list[Operation] = [] if not self.service().try_graceful_shutdown() else [Operation.SHUTDOWN, Operation.SHUTDOWN_COMPLETED] shutdown_operations: list[Operation] = (
destroy_operations = shutdown_operations + self._destroy_queue # copy is not needed due to list concatenation []
if not self.service().try_graceful_shutdown()
else [Operation.SHUTDOWN, Operation.SHUTDOWN_COMPLETED]
)
destroy_operations = (
[Operation.DESTROY_VALIDATOR] + shutdown_operations + self._destroy_queue
) # copy is not needed due to list concatenation
# If a "paused" state, reset queue to destroy # If a "paused" state, reset queue to destroy
if op in (Operation.FINISH, Operation.WAIT): if op in (Operation.FINISH, Operation.WAIT):
@ -375,7 +398,6 @@ class DynamicUserService(services.UserService, autoserializable.AutoSerializable
# Do not execute anything.here, just continue normally # Do not execute anything.here, just continue normally
return types.states.TaskState.RUNNING return types.states.TaskState.RUNNING
# Execution methods # Execution methods
# Every Operation has an execution method and a check method # Every Operation has an execution method and a check method
def op_initialize(self) -> None: def op_initialize(self) -> None:
@ -396,6 +418,7 @@ class DynamicUserService(services.UserService, autoserializable.AutoSerializable
""" """
pass pass
@must_have_vmid
def op_start(self) -> None: def op_start(self) -> None:
""" """
This method is called when the service is started This method is called when the service is started
@ -408,6 +431,7 @@ class DynamicUserService(services.UserService, autoserializable.AutoSerializable
""" """
pass pass
@must_have_vmid
def op_stop(self) -> None: def op_stop(self) -> None:
""" """
This method is called for stopping the service This method is called for stopping the service
@ -420,6 +444,7 @@ class DynamicUserService(services.UserService, autoserializable.AutoSerializable
""" """
pass pass
@must_have_vmid
def op_shutdown(self) -> None: def op_shutdown(self) -> None:
""" """
This method is called for shutdown the service This method is called for shutdown the service
@ -429,10 +454,10 @@ class DynamicUserService(services.UserService, autoserializable.AutoSerializable
if not is_running: if not is_running:
# Already stopped, just finish # Already stopped, just finish
return return
self.service().shutdown_machine(self, self._vmid) self.service().shutdown_machine(self, self._vmid)
shutdown_stamp = sql_stamp_seconds() shutdown_stamp = sql_stamp_seconds()
with self.storage.as_dict() as data: with self.storage.as_dict() as data:
data['shutdown'] = shutdown_stamp data['shutdown'] = shutdown_stamp
@ -442,12 +467,13 @@ class DynamicUserService(services.UserService, autoserializable.AutoSerializable
""" """
pass pass
@must_have_vmid
def op_suspend(self) -> None: def op_suspend(self) -> None:
""" """
This method is called for suspend the service This method is called for suspend the service
""" """
# Note that by default suspend is "shutdown" and not "stop" because we # Note that by default suspend is "shutdown" and not "stop" because we
self.service().suspend_machine(self, self._vmid) self.op_shutdown()
def op_suspend_completed(self) -> None: def op_suspend_completed(self) -> None:
""" """
@ -455,18 +481,20 @@ class DynamicUserService(services.UserService, autoserializable.AutoSerializable
""" """
pass pass
@must_have_vmid
def op_reset(self) -> None: def op_reset(self) -> None:
""" """
This method is called when the service is reset This method is called when the service is reset
""" """
pass self.service().reset_machine(self, self._vmid)
def op_reset_completed(self) -> None: def op_reset_completed(self) -> None:
""" """
This method is called when the service reset is completed This method is called when the service reset is completed
""" """
self.service().reset_machine(self, self._vmid) pass
@must_have_vmid
def op_remove(self) -> None: def op_remove(self) -> None:
""" """
This method is called when the service is removed This method is called when the service is removed
@ -493,6 +521,15 @@ class DynamicUserService(services.UserService, autoserializable.AutoSerializable
This does nothing, as it's a NOP operation This does nothing, as it's a NOP operation
""" """
pass pass
def op_destroy_validator(self) -> None:
"""
This method is called to check if the userservice has an vmid to stop destroying it if needed
"""
# If does not have vmid, we can finish right now
if self._vmid == '':
self._set_queue([Operation.FINISH]) # so we can finish right now
return
def op_custom(self, operation: Operation) -> None: def op_custom(self, operation: Operation) -> None:
""" """
@ -524,7 +561,10 @@ class DynamicUserService(services.UserService, autoserializable.AutoSerializable
""" """
This method is called to check if the service is started This method is called to check if the service is started
""" """
return types.states.TaskState.FINISHED if self.service().is_machine_running(self, self._vmid):
return types.states.TaskState.FINISHED
return types.states.TaskState.RUNNING
def op_start_completed_checker(self) -> types.states.TaskState: def op_start_completed_checker(self) -> types.states.TaskState:
""" """
@ -536,7 +576,9 @@ class DynamicUserService(services.UserService, autoserializable.AutoSerializable
""" """
This method is called to check if the service is stopped This method is called to check if the service is stopped
""" """
return types.states.TaskState.FINISHED if self.service().is_machine_running(self, self._vmid) is False:
return types.states.TaskState.FINISHED
return types.states.TaskState.RUNNING
def op_stop_completed_checker(self) -> types.states.TaskState: def op_stop_completed_checker(self) -> types.states.TaskState:
""" """
@ -552,7 +594,7 @@ class DynamicUserService(services.UserService, autoserializable.AutoSerializable
with self.storage.as_dict() as data: with self.storage.as_dict() as data:
shutdown_start = data.get('shutdown', -1) shutdown_start = data.get('shutdown', -1)
logger.debug('Shutdown start: %s', shutdown_start) logger.debug('Shutdown start: %s', shutdown_start)
if shutdown_start < 0: # Was already stopped if shutdown_start < 0: # Was already stopped
# Machine is already stop # Machine is already stop
logger.debug('Machine WAS stopped') logger.debug('Machine WAS stopped')
@ -592,7 +634,7 @@ class DynamicUserService(services.UserService, autoserializable.AutoSerializable
""" """
This method is called to check if the service is suspended This method is called to check if the service is suspended
""" """
return types.states.TaskState.FINISHED return self.op_shutdown_checker()
def op_suspend_completed_checker(self) -> types.states.TaskState: def op_suspend_completed_checker(self) -> types.states.TaskState:
""" """
@ -629,6 +671,13 @@ class DynamicUserService(services.UserService, autoserializable.AutoSerializable
This method is called to check if the service is doing nothing This method is called to check if the service is doing nothing
""" """
return types.states.TaskState.FINISHED return types.states.TaskState.FINISHED
def op_destroy_validator_checker(self) -> types.states.TaskState:
"""
This method is called to check if the userservice has an vmid to stop destroying it if needed
"""
# If does not have vmid, we can finish right now
return types.states.TaskState.FINISHED # If we are here, we have a vmid
def op_custom_checker(self, operation: Operation) -> types.states.TaskState: def op_custom_checker(self, operation: Operation) -> types.states.TaskState:
""" """
@ -637,7 +686,7 @@ class DynamicUserService(services.UserService, autoserializable.AutoSerializable
return types.states.TaskState.FINISHED return types.states.TaskState.FINISHED
# ERROR, FINISH and UNKNOWN are not here, as they are final states not needing to be checked # ERROR, FINISH and UNKNOWN are not here, as they are final states not needing to be checked
def migrate_old_queue(self) -> None: def migrate_old_queue(self) -> None:
""" """
If format has to be converted, override this method and do the conversion here If format has to be converted, override this method and do the conversion here
@ -649,15 +698,8 @@ class DynamicUserService(services.UserService, autoserializable.AutoSerializable
def _op2str(op: Operation) -> str: def _op2str(op: Operation) -> str:
return op.name return op.name
def _debug(self, txt: str) -> None: def _debug(self, txt: str) -> str:
logger.debug( return f'Queue at {txt} for {self._name}: {", ".join([DynamicUserService._op2str(op) for op in self._queue])}, mac:{self._mac}, vmId:{self._vmid}'
'Queue at %s for %s: %s, mac:%s, vmId:%s',
txt,
self._name,
[DynamicUserService._op2str(op) for op in self._queue],
self._mac,
self._vmid,
)
# This is a map of operations to methods # This is a map of operations to methods
@ -682,6 +724,7 @@ _EXECUTORS: typing.Final[
Operation.REMOVE_COMPLETED: DynamicUserService.op_remove_completed, Operation.REMOVE_COMPLETED: DynamicUserService.op_remove_completed,
Operation.WAIT: DynamicUserService.op_wait, Operation.WAIT: DynamicUserService.op_wait,
Operation.NOP: DynamicUserService.op_nop, Operation.NOP: DynamicUserService.op_nop,
Operation.DESTROY_VALIDATOR: DynamicUserService.op_destroy_validator,
} }
# Same af before, but for check methods # Same af before, but for check methods
@ -703,4 +746,5 @@ _CHECKERS: typing.Final[
Operation.REMOVE_COMPLETED: DynamicUserService.op_remove_completed_checker, Operation.REMOVE_COMPLETED: DynamicUserService.op_remove_completed_checker,
Operation.WAIT: DynamicUserService.op_wait_checker, Operation.WAIT: DynamicUserService.op_wait_checker,
Operation.NOP: DynamicUserService.op_nop_checker, Operation.NOP: DynamicUserService.op_nop_checker,
Operation.DESTROY_VALIDATOR: DynamicUserService.op_destroy_validator_checker,
} }

View File

@ -144,7 +144,11 @@ class Operation(enum.IntEnum):
WAIT = 1100 WAIT = 1100
NOP = 1101 NOP = 1101
# Custom validations
DESTROY_VALIDATOR = 1102 # Check if the userservice has an vmid to stop destroying it if needed
# Final operations
ERROR = 9000 ERROR = 9000
FINISH = 9900 FINISH = 9900
UNKNOWN = 9999 UNKNOWN = 9999

View File

@ -35,7 +35,7 @@ import enum
import logging import logging
import typing import typing
from uds.core import types from uds.core import types, consts
from uds.core.services.specializations.dynamic_machine.dynamic_userservice import DynamicUserService, Operation from uds.core.services.specializations.dynamic_machine.dynamic_userservice import DynamicUserService, Operation
from uds.core.managers.userservice import UserServiceManager from uds.core.managers.userservice import UserServiceManager
from uds.core.util import autoserializable from uds.core.util import autoserializable
@ -103,7 +103,7 @@ class OldOperation(enum.IntEnum):
# UP_STATES = ('up', 'reboot_in_progress', 'powering_up', 'restoring_state') # UP_STATES = ('up', 'reboot_in_progress', 'powering_up', 'restoring_state')
class ProxmoxUserserviceLinked(DynamicUserService, autoserializable.AutoSerializable): class ProxmoxUserserviceLinked(DynamicUserService):
""" """
This class generates the user consumable elements of the service tree. This class generates the user consumable elements of the service tree.
@ -115,26 +115,34 @@ class ProxmoxUserserviceLinked(DynamicUserService, autoserializable.AutoSerializ
""" """
# : Recheck every this seconds by default (for task methods)
suggested_delay = 12
_task = autoserializable.StringField(default='') _task = autoserializable.StringField(default='')
# own vars
# _name: str
# _ip: str
# _mac: str
# _task: str
# _vmid: str
# _reason: str
# _queue: list[int]
def _store_task(self, upid: 'client.types.UPID') -> None: def _store_task(self, upid: 'client.types.UPID') -> None:
self._task = ','.join([upid.node, upid.upid]) self._task = ','.join([upid.node, upid.upid])
def _retrieve_task(self) -> tuple[str, str]: def _retrieve_task(self) -> tuple[str, str]:
vals = self._task.split(',') vals = self._task.split(',')
return (vals[0], vals[1]) return (vals[0], vals[1])
def _check_task_finished(self) -> types.states.TaskState:
if self._task == '':
return types.states.TaskState.FINISHED
node, upid = self._retrieve_task()
try:
task = self.service().provider().get_task_info(node, upid)
except client.ProxmoxConnectionError:
return types.states.TaskState.RUNNING # Try again later
if task.is_errored():
return self._error(task.exitstatus)
if task.is_completed():
return types.states.TaskState.FINISHED
return types.states.TaskState.RUNNING
def service(self) -> 'ProxmoxServiceLinked': def service(self) -> 'ProxmoxServiceLinked':
return typing.cast('ProxmoxServiceLinked', super().service()) return typing.cast('ProxmoxServiceLinked', super().service())
@ -160,30 +168,54 @@ class ProxmoxUserserviceLinked(DynamicUserService, autoserializable.AutoSerializ
self._task = vals[4].decode('utf8') self._task = vals[4].decode('utf8')
self._vmid = vals[5].decode('utf8') self._vmid = vals[5].decode('utf8')
self._reason = vals[6].decode('utf8') self._reason = vals[6].decode('utf8')
self._queue = [Operation.from_int(i) for i in pickle.loads(vals[7])] # nosec: controled data # Load from old format and convert to new one directly
self._queue = [
OldOperation.from_int(i).to_operation() for i in pickle.loads(vals[7])
] # nosec: controled data
# Also, mark as it is using new queue format
self._queue_has_new_format = True
self.mark_for_upgrade() # Flag so manager can save it again with new format self.mark_for_upgrade() # Flag so manager can save it again with new format
def op_reset(self) -> None: def op_reset(self) -> None:
if self._vmid: if self._vmid:
self.service().provider().reset_machine(int(self._vmid)) self.service().provider().reset_machine(int(self._vmid))
# No need for op_reset_checker
def op_create(self) -> None: def op_create(self) -> None:
return super().op_create() template_id = self.publication().machine()
name = self.get_name()
if name == consts.NO_MORE_NAMES:
raise Exception(
'No more names available for this service. (Increase digits for this service to fix)'
)
comments = 'UDS Linked clone'
task_result = self.service().clone_machine(name, comments, template_id)
self._store_task(task_result.upid)
self._vmid = str(task_result.vmid)
def op_create_checker(self) -> types.states.TaskState:
return self._check_task_finished()
def op_create_completed(self) -> None: def op_create_completed(self) -> None:
# Retreive network info and store it # Set mac
return super().op_create_completed() try:
# Note: service will only enable ha if it is configured to do so
self.service().enable_machine_ha(int(self._vmid), True) # Enable HA before continuing here
def op_start(self) -> None: # Set vm mac address now on first interface
return super().op_start() self.service().provider().set_machine_mac(int(self._vmid), self.get_unique_id())
except client.ProxmoxConnectionError:
self._retry_again() # Push nop to front of queue, so it is consumed instead of this one
return
except Exception as e:
logger.exception('Setting HA and MAC on proxmox')
raise Exception(f'Error setting MAC and HA on proxmox: {e}') from e
def op_stop(self) -> None: # No need for op_create_completed_checker
return super().op_stop()
def op_shutdown(self) -> None:
return super().op_shutdown()
def get_console_connection( def get_console_connection(
self, self,
) -> typing.Optional[types.services.ConsoleConnectionInfo]: ) -> typing.Optional[types.services.ConsoleConnectionInfo]:

View File

@ -288,12 +288,33 @@ class ProxmoxServiceLinked(DynamicService):
return self.get_nic_mac(int(machine_id)) return self.get_nic_mac(int(machine_id))
def start_machine(self, caller_instance: 'DynamicUserService | DynamicPublication', machine_id: str) -> None: def start_machine(self, caller_instance: 'DynamicUserService | DynamicPublication', machine_id: str) -> None:
if not self.is_machine_running(caller_instance, machine_id): if isinstance(caller_instance, ProxmoxUserserviceLinked):
self.provider().start_machine(int(machine_id)) if not self.is_machine_running(caller_instance, machine_id): # If not running, start it
caller_instance._task = ''
else:
caller_instance._store_task(self.provider().start_machine(int(machine_id)))
else:
raise Exception('Invalid caller instance (publication) for start_machine()')
def stop_machine(self, caller_instance: 'DynamicUserService | DynamicPublication', machine_id: str) -> None: def stop_machine(self, caller_instance: 'DynamicUserService | DynamicPublication', machine_id: str) -> None:
if self.is_machine_running(caller_instance, machine_id): if isinstance(caller_instance, ProxmoxUserserviceLinked):
self.provider().stop_machine(int(machine_id)) if self.is_machine_running(caller_instance, machine_id):
caller_instance._store_task(self.provider().stop_machine(int(machine_id)))
else:
caller_instance._task = ''
else:
raise Exception('Invalid caller instance (publication) for stop_machine()')
def shutdown_machine(
self, caller_instance: 'DynamicUserService | DynamicPublication', machine_id: str
) -> None:
if isinstance(caller_instance, ProxmoxUserserviceLinked):
if self.is_machine_running(caller_instance, machine_id):
caller_instance._store_task(self.provider().shutdown_machine(int(machine_id)))
else:
caller_instance._task = ''
else:
raise Exception('Invalid caller instance (publication) for shutdown_machine()')
def is_machine_running( def is_machine_running(
self, caller_instance: 'DynamicUserService | DynamicPublication', machine_id: str self, caller_instance: 'DynamicUserService | DynamicPublication', machine_id: str

View File

@ -33,13 +33,17 @@ Author: Adolfo Gómez, dkmaster at dkmon dot com
import pickle import pickle
import typing import typing
from uds.core import environment, types
from uds.services.Proxmox.deployment_linked import (
OldOperation as OldOperation,
ProxmoxUserserviceLinked as Deployment,
)
# We use storage, so we need transactional tests # We use storage, so we need transactional tests
from ...utils.test import UDSTransactionTestCase from ...utils.test import UDSTransactionTestCase
from ...utils import fake from ...utils import fake
from . import fixtures
from uds.core.environment import Environment
from uds.services.Proxmox.deployment_linked import Operation as Operation, ProxmoxUserserviceLinked as Deployment
# if not data.startswith(b'v'): # if not data.startswith(b'v'):
@ -53,11 +57,12 @@ from uds.services.Proxmox.deployment_linked import Operation as Operation, Proxm
# self._task = vals[4].decode('utf8') # self._task = vals[4].decode('utf8')
# self._vmid = vals[5].decode('utf8') # self._vmid = vals[5].decode('utf8')
# self._reason = vals[6].decode('utf8') # self._reason = vals[6].decode('utf8')
# self._queue = [Operation.from_int(i) for i in pickle.loads(vals[7])] # nosec: controled data # self._queue = [OldOperation.from_int(i) for i in pickle.loads(vals[7])] # nosec: controled data
# self.flag_for_upgrade() # Flag so manager can save it again with new format # self.flag_for_upgrade() # Flag so manager can save it again with new format
EXPECTED_FIELDS: typing.Final[set[str]] = { # Note that new implementation can hold more fields than ours, so we need to check only the ones we need
EXPECTED_OWN_FIELDS: typing.Final[set[str]] = {
'_name', '_name',
'_ip', '_ip',
'_mac', '_mac',
@ -67,12 +72,15 @@ EXPECTED_FIELDS: typing.Final[set[str]] = {
'_queue', '_queue',
} }
TEST_QUEUE: typing.Final[list[Operation]] = [ # Old queue content and format
Operation.CREATE, TEST_QUEUE: typing.Final[list[OldOperation]] = [
Operation.REMOVE, OldOperation.CREATE,
Operation.RETRY, OldOperation.REMOVE,
OldOperation.RETRY,
] ]
TEST_QUEUE_NEW: typing.Final[list[types.services.Operation]] = [i.to_operation() for i in TEST_QUEUE]
SERIALIZED_DEPLOYMENT_DATA: typing.Final[typing.Mapping[str, bytes]] = { SERIALIZED_DEPLOYMENT_DATA: typing.Final[typing.Mapping[str, bytes]] = {
'v1': b'v1\x01name\x01ip\x01mac\x01task\x01vmid\x01reason\x01' + pickle.dumps(TEST_QUEUE, protocol=0), 'v1': b'v1\x01name\x01ip\x01mac\x01task\x01vmid\x01reason\x01' + pickle.dumps(TEST_QUEUE, protocol=0),
} }
@ -88,14 +96,11 @@ class ProxmoxDeploymentSerializationTest(UDSTransactionTestCase):
self.assertEqual(instance._task, 'task') self.assertEqual(instance._task, 'task')
self.assertEqual(instance._vmid, 'vmid') self.assertEqual(instance._vmid, 'vmid')
self.assertEqual(instance._reason, 'reason') self.assertEqual(instance._reason, 'reason')
self.assertEqual(instance._queue, TEST_QUEUE) self.assertEqual(instance._queue, TEST_QUEUE_NEW)
def test_marshaling(self) -> None: def test_marshaling(self) -> None:
# queue is kept on "storage", so we need always same environment
environment = Environment.testing_environment()
def _create_instance(unmarshal_data: 'bytes|None' = None) -> Deployment: def _create_instance(unmarshal_data: 'bytes|None' = None) -> Deployment:
instance = Deployment(environment=environment, service=fake.fake_service()) instance = fixtures.create_userservice_linked()
if unmarshal_data: if unmarshal_data:
instance.unmarshal(unmarshal_data) instance.unmarshal(unmarshal_data)
return instance return instance
@ -120,56 +125,57 @@ class ProxmoxDeploymentSerializationTest(UDSTransactionTestCase):
self.check(version, instance) self.check(version, instance)
def test_marshaling_queue(self) -> None: def test_marshaling_queue(self) -> None:
# queue is kept on "storage", so we need always same environment
environment = Environment.testing_environment()
# Store queue
environment.storage.save_pickled('queue', TEST_QUEUE)
def _create_instance(unmarshal_data: 'bytes|None' = None) -> Deployment: def _create_instance(unmarshal_data: 'bytes|None' = None) -> Deployment:
instance = Deployment(environment=environment, service=fake.fake_service()) instance = fixtures.create_userservice_linked()
instance.env.storage.save_pickled('queue', TEST_QUEUE)
if unmarshal_data: if unmarshal_data:
instance.unmarshal(unmarshal_data) instance.unmarshal(unmarshal_data)
return instance return instance
instance = _create_instance(SERIALIZED_DEPLOYMENT_DATA[LAST_VERSION]) instance = _create_instance(SERIALIZED_DEPLOYMENT_DATA[LAST_VERSION])
self.assertEqual(instance._queue, TEST_QUEUE) self.assertEqual(instance._queue, TEST_QUEUE_NEW)
instance._queue = [ # Ensure that has been imported already
Operation.CREATE, self.assertEqual(instance._queue_has_new_format, True)
Operation.FINISH,
] # Now, access current operation, that will trigger the upgrade
instance._current_op()
self.assertEqual(instance._queue_has_new_format, True)
# And essure quee is as new format should be
self.assertEqual(instance._queue, TEST_QUEUE_NEW)
# Marshal and check again
marshaled_data = instance.marshal() marshaled_data = instance.marshal()
# Now, format is new, so we can't check it with old format
self.assertEqual(marshaled_data.startswith(b'v'), False)
instance = _create_instance(marshaled_data) instance = _create_instance(marshaled_data)
self.assertEqual( self.assertEqual(
instance._queue, instance._queue,
[Operation.CREATE, Operation.FINISH], TEST_QUEUE_NEW,
) )
self.assertEqual(instance._queue_has_new_format, True)
# Append something remarshall and check # Append something remarshall and check
instance._queue.insert(0, Operation.RETRY) instance._queue.insert(0, types.services.Operation.RESET)
marshaled_data = instance.marshal() marshaled_data = instance.marshal()
instance = _create_instance(marshaled_data) instance = _create_instance(marshaled_data)
self.assertEqual( self.assertEqual(
instance._queue, instance._queue,
[ [types.services.Operation.RESET] + TEST_QUEUE_NEW,
Operation.RETRY,
Operation.CREATE,
Operation.FINISH,
],
)
# Remove something remarshall and check
instance._queue.pop(0)
marshaled_data = instance.marshal()
instance = _create_instance(marshaled_data)
self.assertEqual(
instance._queue,
[Operation.CREATE, Operation.FINISH],
) )
self.assertEqual(instance._queue_has_new_format, True)
def test_autoserialization_fields(self) -> None: def test_autoserialization_fields(self) -> None:
# This test is designed to ensure that all fields are autoserializable # This test is designed to ensure that all fields are autoserializable
# If some field is added or removed, this tests will warn us about it to fix the rest of the related tests # If some field is added or removed, this tests will warn us about it to fix the rest of the related tests
with Environment.temporary_environment() as env: with environment.Environment.temporary_environment() as env:
instance = Deployment(environment=env, service=fake.fake_service()) instance = Deployment(environment=env, service=fake.fake_service())
self.assertSetEqual(set(f[0] for f in instance._autoserializable_fields()), EXPECTED_FIELDS) self.assertTrue(
EXPECTED_OWN_FIELDS <= set(f[0] for f in instance._autoserializable_fields()),
'Missing fields: '
+ str(EXPECTED_OWN_FIELDS - set(f[0] for f in instance._autoserializable_fields())),
)

View File

@ -57,11 +57,12 @@ class TestProxmovLinkedService(UDSTransactionTestCase):
# patch userservice db_obj() method to return a mock # patch userservice db_obj() method to return a mock
userservice_db = mock.MagicMock() userservice_db = mock.MagicMock()
userservice.db_obj = mock.MagicMock(return_value=userservice_db) userservice.db_obj = mock.MagicMock(return_value=userservice_db)
# Test Deploy for cache, should raise Exception due # Test Deploy for cache, should set to error due
# to the fact fixed services cannot have cached items # to the fact fixed services cannot have cached items
with self.assertRaises(Exception): state = userservice.deploy_for_cache(level=types.services.CacheLevel.L1)
userservice.deploy_for_cache(level=types.services.CacheLevel.L1) self.assertEqual(state, types.states.TaskState.ERROR)
# Test Deploy for user
state = userservice.deploy_for_user(models.User()) state = userservice.deploy_for_user(models.User())
self.assertEqual(state, types.states.TaskState.RUNNING) self.assertEqual(state, types.states.TaskState.RUNNING)

View File

@ -34,7 +34,7 @@ from unittest import mock
from uds import models from uds import models
from uds.core import types from uds.core import types
from uds.services.Proxmox.deployment_linked import Operation
from . import fixtures from . import fixtures
@ -49,7 +49,6 @@ class TestProxmovLinkedService(UDSTransactionTestCase):
fixtures.VMS_INFO = [ fixtures.VMS_INFO = [
fixtures.VMS_INFO[i]._replace(status='stopped') for i in range(len(fixtures.VMS_INFO)) fixtures.VMS_INFO[i]._replace(status='stopped') for i in range(len(fixtures.VMS_INFO))
] ]
def test_userservice_linked_cache_l1(self) -> None: def test_userservice_linked_cache_l1(self) -> None:
""" """
@ -120,9 +119,9 @@ class TestProxmovLinkedService(UDSTransactionTestCase):
for _ in limited_iterator(lambda: state == types.states.TaskState.RUNNING, limit=128): for _ in limited_iterator(lambda: state == types.states.TaskState.RUNNING, limit=128):
state = userservice.check_state() state = userservice.check_state()
# If first item in queue is WAIT, we must "simulate" the wake up from os manager # If first item in queue is WAIT, we must "simulate" the wake up from os manager
if userservice._queue[0] == Operation.WAIT: if userservice._queue[0] == types.services.Operation.WAIT:
state = userservice.process_ready_from_os_manager(None) state = userservice.process_ready_from_os_manager(None)
self.assertEqual(state, types.states.TaskState.FINISHED) self.assertEqual(state, types.states.TaskState.FINISHED)
@ -152,8 +151,8 @@ class TestProxmovLinkedService(UDSTransactionTestCase):
api.set_machine_mac.assert_called_with(vmid, userservice._mac) api.set_machine_mac.assert_called_with(vmid, userservice._mac)
api.get_machine_pool_info.assert_called_with(vmid, service.pool.value, force=True) api.get_machine_pool_info.assert_called_with(vmid, service.pool.value, force=True)
# Now, called should not have been called because machine is running # Now, start should have been called
# api.start_machine.assert_called_with(vmid) api.start_machine.assert_called_with(vmid)
# Stop machine should have been called # Stop machine should have been called
api.shutdown_machine.assert_called_with(vmid) api.shutdown_machine.assert_called_with(vmid)
@ -175,7 +174,11 @@ class TestProxmovLinkedService(UDSTransactionTestCase):
for _ in limited_iterator(lambda: state == types.states.TaskState.RUNNING, limit=128): for _ in limited_iterator(lambda: state == types.states.TaskState.RUNNING, limit=128):
state = userservice.check_state() state = userservice.check_state()
self.assertEqual(state, types.states.TaskState.FINISHED) self.assertEqual(
state,
types.states.TaskState.FINISHED,
f'Queue: {userservice._queue}, reason: {userservice._reason}, extra_info: {userservice._error_debug_info}',
)
self.assertEqual(userservice._name[: len(service.get_basename())], service.get_basename()) self.assertEqual(userservice._name[: len(service.get_basename())], service.get_basename())
self.assertEqual(len(userservice._name), len(service.get_basename()) + service.get_lenname()) self.assertEqual(len(userservice._name), len(service.get_basename()) + service.get_lenname())
@ -202,15 +205,15 @@ class TestProxmovLinkedService(UDSTransactionTestCase):
api.set_machine_mac.assert_called_with(vmid, userservice._mac) api.set_machine_mac.assert_called_with(vmid, userservice._mac)
api.get_machine_pool_info.assert_called_with(vmid, service.pool.value, force=True) api.get_machine_pool_info.assert_called_with(vmid, service.pool.value, force=True)
api.start_machine.assert_called_with(vmid) api.start_machine.assert_called_with(vmid)
# Set ready state with the valid machine # Set ready state with the valid machine
state = userservice.set_ready() state = userservice.set_ready()
# Machine is stopped, so task must be RUNNING (opossed to FINISHED) # Machine is stopped, so task must be RUNNING (opossed to FINISHED)
self.assertEqual(state, types.states.TaskState.RUNNING) self.assertEqual(state, types.states.TaskState.RUNNING)
for _ in limited_iterator(lambda: state == types.states.TaskState.RUNNING, limit=32): for _ in limited_iterator(lambda: state == types.states.TaskState.RUNNING, limit=32):
state = userservice.check_state() state = userservice.check_state()
# Should be finished now # Should be finished now
self.assertEqual(state, types.states.TaskState.FINISHED) self.assertEqual(state, types.states.TaskState.FINISHED)
@ -218,7 +221,7 @@ class TestProxmovLinkedService(UDSTransactionTestCase):
""" """
Test the user service Test the user service
""" """
with fixtures.patch_provider_api() as _api: with fixtures.patch_provider_api() as api:
for graceful in [True, False]: for graceful in [True, False]:
userservice = fixtures.create_userservice_linked() userservice = fixtures.create_userservice_linked()
service = userservice.service() service = userservice.service()
@ -235,19 +238,42 @@ class TestProxmovLinkedService(UDSTransactionTestCase):
self.assertEqual(state, types.states.TaskState.RUNNING) self.assertEqual(state, types.states.TaskState.RUNNING)
current_op = userservice._get_current_op()
# Invoke cancel # Invoke cancel
api.reset_mock()
state = userservice.cancel() state = userservice.cancel()
self.assertEqual(state, types.states.TaskState.RUNNING) self.assertEqual(state, types.states.TaskState.RUNNING)
# Ensure DESTROY_VALIDATOR is in the queue
self.assertIn(types.services.Operation.DESTROY_VALIDATOR, userservice._queue)
for _ in limited_iterator(lambda: state == types.states.TaskState.RUNNING, limit=128):
state = userservice.check_state()
# Now, should be finished without any problem, no call to api should have been done
self.assertEqual(state, types.states.TaskState.FINISHED)
self.assertEqual(len(api.mock_calls), 0)
# Now again, but process check_queue a couple of times before cancel
# we we have an _vmid
state = userservice.deploy_for_user(models.User())
self.assertEqual(state, types.states.TaskState.RUNNING)
for _ in limited_iterator(lambda: state == types.states.TaskState.RUNNING, limit=128):
state = userservice.check_state()
if userservice._vmid:
break
self.assertEqual( current_op = userservice._current_op()
userservice._queue, state = userservice.cancel()
[current_op] self.assertEqual(state, types.states.TaskState.RUNNING)
+ ([Operation.GRACEFUL_STOP] if graceful else []) self.assertEqual(userservice._queue[0], current_op)
+ [Operation.STOP, Operation.REMOVE, Operation.FINISH], if graceful:
) self.assertIn(types.services.Operation.SHUTDOWN, userservice._queue)
self.assertIn(types.services.Operation.SHUTDOWN_COMPLETED, userservice._queue)
self.assertIn(types.services.Operation.STOP, userservice._queue)
self.assertIn(types.services.Operation.STOP_COMPLETED, userservice._queue)
self.assertIn(types.services.Operation.REMOVE, userservice._queue)
self.assertIn(types.services.Operation.REMOVE_COMPLETED, userservice._queue)
for counter in limited_iterator(lambda: state == types.states.TaskState.RUNNING, limit=128): for counter in limited_iterator(lambda: state == types.states.TaskState.RUNNING, limit=128):
state = userservice.check_state() state = userservice.check_state()
@ -261,6 +287,6 @@ class TestProxmovLinkedService(UDSTransactionTestCase):
self.assertEqual(state, types.states.TaskState.FINISHED) self.assertEqual(state, types.states.TaskState.FINISHED)
if graceful: if graceful:
_api.shutdown_machine.assert_called() api.shutdown_machine.assert_called()
else: else:
_api.stop_machine.assert_called() api.stop_machine.assert_called()