1
0
mirror of https://github.com/dkmstr/openuds.git synced 2025-11-05 16:24:21 +03:00

Moved all marshallers to automatic methods (using UserInterface or AutoSerializable fields...)

This commit is contained in:
Adolfo Gómez García
2024-01-30 03:49:36 +01:00
parent 4a92cdb1e5
commit 06f3487d2c
62 changed files with 1967 additions and 672 deletions

View File

@@ -264,7 +264,7 @@ class RegexLdap(auths.Authenticator):
self._certificate,
) = vals[11:]
self.flag_for_upgrade() # Old version, so flag for upgrade if possible
self.mark_for_upgrade() # Old version, so flag for upgrade if possible
def _stablish_connection(self) -> 'ldaputil.LDAPObject':
"""

View File

@@ -228,7 +228,7 @@ class SimpleLDAPAuthenticator(auths.Authenticator):
if vals[0] == 'v2':
(self.mfa_attribute.value, self.verify_ssl.value, self.certificate.value) = vals[14:17]
self.flag_for_upgrade()
self.mark_for_upgrade()
def mfaStorageKey(self, username: str) -> str:
return 'mfa_' + str(self.db_obj().uuid) + username

View File

@@ -279,7 +279,7 @@ class Module(abc.ABC, UserInterface, Environmentable, Serializable):
data serialized using serializeForm
"""
if self.deserialize_fields(data): # If upgrade of format requested
self.flag_for_upgrade() # Flag for upgrade
self.mark_for_upgrade() # Flag for upgrade
def check(self) -> str:
"""

View File

@@ -111,7 +111,7 @@ class Serializable:
# For remarshalling purposes
# These facilitates a faster migration of old data formats to new ones
# alowing us to remove old format support as soon as possible
def flag_for_upgrade(self, value: bool = True) -> None:
def mark_for_upgrade(self, value: bool = True) -> None:
"""
Flags this object for remarshalling

View File

@@ -1504,7 +1504,7 @@ class UserInterface(metaclass=UserInterfaceAbstract):
# if required, we can usefield.old_field_name(), but better
fields = [
(field_name, field.type.name, FIELDS_ENCODERS[field.type](field))
for field_name, field in self._gui.items()
for field_name, field in self._all_serializable_fields()
if FIELDS_ENCODERS[field.type](field) is not None
]
@@ -1516,14 +1516,28 @@ class UserInterface(metaclass=UserInterfaceAbstract):
) -> bool:
"""New form unserialization
Arguments:
values {bytes} -- serialized form (zipped)
Args:
values: list of serilizes (as bytes) values
Returns:
True if should upgrade to new format, False if not
bool -- True if values were unserialized using OLD method, False if using new one
Keyword Arguments:
serializer {typing.Optional[collections.abc.Callable[[str], typing.Any]]} -- deserializer (default: {None})
Note:
If returns True, the manager will try to remarshall the values using the new method
Note:
The format of serialized fields is:
.. code-block:: python
SERIALIZATION_HEADER + SERIALIZATION_VERSION + serializer.serialize(fields)
Where:
* SERIALIZATION_HEADER: b'GUIZ' (header)
* SERIALIZATION_VERSION: b'\001' (serialization version, currently 1)
* fields: list of tuples (field_name, field_type, field_value)
* serializer: serializer used (custom one that pickles, compress and encrypts data)
"""
if not values:
@@ -1551,8 +1565,7 @@ class UserInterface(metaclass=UserInterfaceAbstract):
field_names_translations: dict[str, str] = self._get_fieldname_translations()
# Set all values to defaults ones
for field_name in self._gui:
field = self._gui[field_name]
for field_name, field in self._all_serializable_fields():
if (
field.is_type(types.ui.FieldType.HIDDEN)
and field.is_serializable() is False
@@ -1670,10 +1683,14 @@ class UserInterface(metaclass=UserInterfaceAbstract):
return found_errors
def _all_serializable_fields(self) -> collections.abc.Iterable[tuple[str, gui.InputField]]:
for k, field in self._gui.items():
yield (k, field)
def _get_fieldname_translations(self) -> dict[str, str]:
# Dict of translations from old_field_name to field_name
field_names_translations: dict[str, str] = {}
for fld_name, fld in self._gui.items():
for fld_name, fld in self._all_serializable_fields():
fld_old_field_name = fld.old_field_name()
if fld_old_field_name and fld_old_field_name != fld_name:
field_names_translations[fld_old_field_name] = fld_name

View File

@@ -120,7 +120,6 @@ def is_autoserializable_data(data: bytes) -> bool:
"""
return data[: len(HEADER_BASE)] == HEADER_BASE
@dataclasses.dataclass(slots=True)
class _MarshalInfo:
name: str
@@ -449,7 +448,7 @@ class AutoSerializable(Serializable, metaclass=_FieldNameSetter):
_fields: dict[str, typing.Any]
def _all_fields_attrs(self) -> collections.abc.Iterator[tuple[str, typing.Any]]:
def _autoserializable_fields(self) -> collections.abc.Iterator[tuple[str, _SerializableField]]:
"""Returns an iterator over all fields in the class, including inherited ones
Returns:
@@ -501,7 +500,7 @@ class AutoSerializable(Serializable, metaclass=_FieldNameSetter):
# Iterate over own members and extract fields
fields: list[_MarshalInfo] = [
_MarshalInfo(name=v.name, type_name=str(v.__class__.__name__), value=v.marshal(self))
for _, v in self._all_fields_attrs()
for _, v in self._autoserializable_fields()
]
# Serialized data is:
@@ -543,7 +542,7 @@ class AutoSerializable(Serializable, metaclass=_FieldNameSetter):
field, data = _MarshalInfo.unmarshal(data)
fields[field.name] = field
for _, v in self._all_fields_attrs():
for _, v in self._autoserializable_fields():
if isinstance(v, _SerializableField):
if v.name in fields:
if fields[v.name].type_name == str(v.__class__.__name__):
@@ -568,9 +567,9 @@ class AutoSerializable(Serializable, metaclass=_FieldNameSetter):
if not isinstance(other, AutoSerializable):
return False
all_fields_attrs = list(self._all_fields_attrs())
all_fields_attrs = list(self._autoserializable_fields())
if {k for k, _ in all_fields_attrs} != {k for k, _ in other._all_fields_attrs()}:
if {k for k, _ in all_fields_attrs} != {k for k, _ in other._autoserializable_fields()}:
return False
for k, v in all_fields_attrs:
@@ -584,7 +583,7 @@ class AutoSerializable(Serializable, metaclass=_FieldNameSetter):
return ', '.join(
[
f"{k}={v.obj_type.__name__}({v.__get__(self)})"
for k, v in self._all_fields_attrs()
for k, v in self._autoserializable_fields()
if isinstance(v, _SerializableField)
]
)

View File

@@ -84,7 +84,7 @@ class ManagedObjectModel(UUIDModel):
# Re-serialize to db
self.data = obj.serialize()
self.save(update_fields=['data'])
obj.flag_for_upgrade(False)
obj.mark_for_upgrade(False)
self._cached_instance = None # Ensures returns correct value on get_instance

View File

@@ -157,7 +157,7 @@ class ServicePoolPublication(UUIDModel):
publication.deserialize(self.data)
if publication.needs_upgrade():
self.update_data(publication)
publication.flag_for_upgrade(False)
publication.mark_for_upgrade(False)
return publication

View File

@@ -245,7 +245,7 @@ class UserService(UUIDModel, properties.PropertiesMixin):
if us.needs_upgrade():
self.data = us.serialize()
self.save(update_fields=['data'])
us.flag_for_upgrade(False)
us.mark_for_upgrade(False)
except Exception:
logger.exception(

View File

@@ -176,4 +176,4 @@ class LinuxOsManager(osmanagers.OSManager):
self._flag_processes_unused_machines()
# Flag that we need an upgrade (remarshal and save)
self.flag_for_upgrade()
self.mark_for_upgrade()

View File

@@ -126,7 +126,7 @@ class LinuxRandomPassManager(LinuxOsManager):
self.user_account.value = values[1].decode()
LinuxOsManager.unmarshal(self, codecs.decode(values[2], 'hex'))
self.flag_for_upgrade()
self.mark_for_upgrade()
# Recalculate flag indicating if we need to process unused machines
self._flag_processes_unused_machines()

View File

@@ -170,4 +170,4 @@ class WindowsOsManager(osmanagers.OSManager):
self._flag_processes_unused_machines()
# Flag that we need an upgrade (remarshal and save)
self.flag_for_upgrade()
self.mark_for_upgrade()

View File

@@ -466,4 +466,4 @@ class WinDomainOsManager(WindowsOsManager):
self.remove_on_exit.value = True
super().unmarshal(codecs.decode(values[5].encode(), 'hex'))
self.flag_for_upgrade() # Force upgrade to new format
self.mark_for_upgrade() # Force upgrade to new format

View File

@@ -145,4 +145,4 @@ class WinRandomPassManager(WindowsOsManager):
self.password.value = CryptoManager().decrypt(values[2])
super().unmarshal(codecs.decode(values[3].encode(), 'hex'))
self.flag_for_upgrade() # Force upgrade to new format
self.mark_for_upgrade() # Force upgrade to new format

View File

@@ -30,43 +30,59 @@
"""
Author: Adolfo Gómez, dkmaster at dkmon dot com
"""
import pickle # nosec: not insecure, we are loading our own data
import logging
import typing
import collections.abc
import enum
import logging
import pickle # nosec: not insecure, we are loading our own data
import typing
from uds.core import services, consts
from uds.core import consts, services
from uds.core.managers.user_service import UserServiceManager
from uds.core.types.states import State
from uds.core.util import log
from uds.core.util import autoserializable, log
from .jobs import OVirtDeferredRemoval
# Not imported at runtime, just for type checking
if typing.TYPE_CHECKING:
from uds import models
from .service import OVirtLinkedService
from .publication import OVirtPublication
from .service import OVirtLinkedService
logger = logging.getLogger(__name__)
(
opCreate,
opStart,
opStop,
opSuspend,
opRemove,
opWait,
opError,
opFinish,
opRetry,
opChangeMac,
) = range(10)
UP_STATES = ('up', 'reboot_in_progress', 'powering_up', 'restoring_state')
class Operation(enum.IntEnum):
"""
Operation enumeration
"""
CREATE = 0
START = 1
STOP = 2
SUSPEND = 3
REMOVE = 4
WAIT = 5
ERROR = 6
FINISH = 7
RETRY = 8
CHANGEMAC = 9
opUnknown = 99
@staticmethod
def from_int(value: int) -> 'Operation':
try:
return Operation(value)
except ValueError:
return Operation.opUnknown
class OVirtLinkedDeployment(services.UserService):
UP_STATES: typing.Final[set[str]] = {'up', 'reboot_in_progress', 'powering_up', 'restoring_state'}
class OVirtLinkedDeployment(services.UserService, autoserializable.AutoSerializable):
"""
This class generates the user consumable elements of the service tree.
@@ -81,13 +97,12 @@ class OVirtLinkedDeployment(services.UserService):
# : Recheck every six seconds by default (for task methods)
suggested_delay = 6
# own vars
_name: str
_ip: str
_mac: str
_vmid: str
_reason: str
_queue: list[int]
_name = autoserializable.StringField(default='')
_ip = autoserializable.StringField(default='')
_mac = autoserializable.StringField(default='')
_vmid = autoserializable.StringField(default='')
_reason = autoserializable.StringField(default='')
_queue = autoserializable.ListField[Operation]()
# Utility overrides for type checking...
def service(self) -> 'OVirtLinkedService':
@@ -99,35 +114,13 @@ class OVirtLinkedDeployment(services.UserService):
raise Exception('No publication for this element!')
return typing.cast('OVirtPublication', pub)
def initialize(self):
self._name = ''
self._ip = ''
self._mac = ''
self._vmid = ''
self._reason = ''
self._queue = []
# Serializable needed methods
def marshal(self) -> bytes:
"""
Does nothing right here, we will use environment storage in this sample
"""
return b'\1'.join(
[
b'v1',
self._name.encode('utf8'),
self._ip.encode('utf8'),
self._mac.encode('utf8'),
self._vmid.encode('utf8'),
self._reason.encode('utf8'),
pickle.dumps(self._queue, protocol=0),
]
)
def unmarshal(self, data: bytes) -> None:
"""
Does nothing here also, all data are keeped at environment storage
"""
if not data.startswith(b'v'):
return super().unmarshal(data)
vals = data.split(b'\1')
if vals[0] == b'v1':
self._name = vals[1].decode('utf8')
@@ -135,9 +128,11 @@ class OVirtLinkedDeployment(services.UserService):
self._mac = vals[3].decode('utf8')
self._vmid = vals[4].decode('utf8')
self._reason = vals[5].decode('utf8')
self._queue = pickle.loads(
vals[6]
) # nosec: not insecure, we are loading our own data
self._queue = [
Operation.from_int(i) for i in pickle.loads(vals[6])
] # nosec: not insecure, we are loading our own data
self.mark_for_upgrade() # Flag so manager can save it again with new format
def get_name(self) -> str:
if self._name == '':
@@ -207,14 +202,14 @@ class OVirtLinkedDeployment(services.UserService):
return State.FINISHED
try:
state = self.service().getMachineState(self._vmid)
state = self.service().get_machine_state(self._vmid)
if state == 'unknown':
return self.__error('Machine is not available anymore')
return self._error('Machine is not available anymore')
if state not in UP_STATES:
self._queue = [opStart, opFinish]
return self.__executeQueue()
self._queue = [Operation.START, Operation.FINISH]
return self._execute_queue()
self.cache.put('ready', '1')
except Exception as e:
@@ -235,7 +230,7 @@ class OVirtLinkedDeployment(services.UserService):
) -> typing.Optional[collections.abc.MutableMapping[str, typing.Any]]:
return self.service().getConsoleConnection(self._vmid)
def desktopLogin(
def desktop_login(
self,
username: str,
password: str,
@@ -255,10 +250,10 @@ if sys.platform == 'win32':
def process_ready_from_os_manager(self, data: typing.Any) -> str:
# Here we will check for suspending the VM (when full ready)
logger.debug('Checking if cache 2 for %s', self._name)
if self.__getCurrentOp() == opWait:
if self._get_current_op() == Operation.WAIT:
logger.debug('Machine is ready. Moving to level 2')
self.__popCurrentOp() # Remove current state
return self.__executeQueue()
self._pop_current_op() # Remove current state
return self._execute_queue()
# Do not need to go to level 2 (opWait is in fact "waiting for moving machine to cache level 2)
return State.FINISHED
@@ -267,66 +262,70 @@ if sys.platform == 'win32':
Deploys an service instance for an user.
"""
logger.debug('Deploying for user')
self.__initQueueForDeploy(False)
return self.__executeQueue()
self._init_queue_for_deploy(False)
return self._execute_queue()
def deploy_for_cache(self, cacheLevel: int) -> str:
"""
Deploys an service instance for cache
"""
self.__initQueueForDeploy(cacheLevel == self.L2_CACHE)
return self.__executeQueue()
self._init_queue_for_deploy(cacheLevel == self.L2_CACHE)
return self._execute_queue()
def __initQueueForDeploy(self, forLevel2: bool = False) -> None:
def _init_queue_for_deploy(self, forLevel2: bool = False) -> None:
if forLevel2 is False:
self._queue = [opCreate, opChangeMac, opStart, opFinish]
self._queue = [Operation.CREATE, Operation.CHANGEMAC, Operation.START, Operation.FINISH]
else:
self._queue = [opCreate, opChangeMac, opStart, opWait, opSuspend, opFinish]
self._queue = [
Operation.CREATE,
Operation.CHANGEMAC,
Operation.START,
Operation.WAIT,
Operation.SUSPEND,
Operation.FINISH,
]
def __checkMachineState(
self, chkState: typing.Union[list[str], tuple[str, ...], str]
) -> str:
def _check_machine_state(self, check_state: collections.abc.Iterable[str]) -> str:
logger.debug(
'Checking that state of machine %s (%s) is %s',
self._vmid,
self._name,
chkState,
check_state,
)
state = self.service().getMachineState(self._vmid)
state = self.service().get_machine_state(self._vmid)
# If we want to check an state and machine does not exists (except in case that we whant to check this)
if state == 'unknown' and chkState != 'unknown':
return self.__error('Machine not found')
if state == 'unknown' and check_state != 'unknown':
return self._error('Machine not found')
ret = State.RUNNING
if isinstance(chkState, (list, tuple)):
for cks in chkState:
if isinstance(check_state, (list, tuple)):
for cks in check_state:
if state == cks:
ret = State.FINISHED
break
else:
if state == chkState:
if state == check_state:
ret = State.FINISHED
return ret
def __getCurrentOp(self) -> int:
def _get_current_op(self) -> Operation:
if not self._queue:
return opFinish
return Operation.FINISH
return self._queue[0]
def __popCurrentOp(self) -> int:
def _pop_current_op(self) -> Operation:
if not self._queue:
return opFinish
return Operation.FINISH
res = self._queue.pop(0)
return res
return self._queue.pop(0)
def __pushFrontOp(self, op: int):
def _push_front_op(self, op: Operation) -> None:
self._queue.insert(0, op)
def __error(self, reason: typing.Union[str, Exception]) -> str:
def _error(self, reason: typing.Union[str, Exception]) -> str:
"""
Internal method to set object as error state
@@ -340,47 +339,45 @@ if sys.platform == 'win32':
if self._vmid != '': # Powers off
OVirtDeferredRemoval.remove(self.service().parent(), self._vmid)
self._queue = [opError]
self._queue = [Operation.ERROR]
self._reason = reason
return State.ERROR
def __executeQueue(self) -> str:
self.__debug('executeQueue')
op = self.__getCurrentOp()
def _execute_queue(self) -> str:
self._debug('executeQueue')
op = self._get_current_op()
if op == opError:
if op == Operation.ERROR:
return State.ERROR
if op == opFinish:
if op == Operation.FINISH:
return State.FINISHED
fncs: dict[int, typing.Optional[collections.abc.Callable[[], str]]] = {
opCreate: self.__create,
opRetry: self.__retry,
opStart: self.__startMachine,
opStop: self.__stopMachine,
opSuspend: self.__suspendMachine,
opWait: self.__wait,
opRemove: self.__remove,
opChangeMac: self.__changeMac,
fncs: dict[Operation, typing.Optional[collections.abc.Callable[[], str]]] = {
Operation.CREATE: self._create,
Operation.RETRY: self._retry,
Operation.START: self._start_machine,
Operation.STOP: self._stop_machine,
Operation.SUSPEND: self._suspend_machine,
Operation.WAIT: self._wait,
Operation.REMOVE: self._remove,
Operation.CHANGEMAC: self._change_mac,
}
try:
execFnc: typing.Optional[collections.abc.Callable[[], str]] = fncs.get(op, None)
operation_runner: typing.Optional[collections.abc.Callable[[], str]] = fncs.get(op, None)
if execFnc is None:
return self.__error(
f'Unknown operation found at execution queue ({op})'
)
if operation_runner is None:
return self._error(f'Unknown operation found at execution queue ({op})')
execFnc()
operation_runner()
return State.RUNNING
except Exception as e:
return self.__error(e)
return self._error(e)
# Queue execution methods
def __retry(self) -> str:
def _retry(self) -> str:
"""
Used to retry an operation
In fact, this will not be never invoked, unless we push it twice, because
@@ -390,56 +387,56 @@ if sys.platform == 'win32':
"""
return State.FINISHED
def __wait(self) -> str:
def _wait(self) -> str:
"""
Executes opWait, it simply waits something "external" to end
"""
return State.RUNNING
def __create(self) -> str:
def _create(self) -> str:
"""
Deploys a machine from template for user/cache
"""
templateId = self.publication().getTemplateId()
template_id = self.publication().get_template_id()
name = self.get_name()
if name == consts.NO_MORE_NAMES:
raise Exception(
'No more names available for this service. (Increase digits for this service to fix)'
)
name = self.service().sanitizeVmName(
name = self.service().sanitized_name(
name
) # oVirt don't let us to create machines with more than 15 chars!!!
comments = 'UDS Linked clone'
self._vmid = self.service().deployFromTemplate(name, comments, templateId)
self._vmid = self.service().deploy_from_template(name, comments, template_id)
if self._vmid is None:
raise Exception('Can\'t create machine')
return State.RUNNING
def __remove(self) -> str:
def _remove(self) -> str:
"""
Removes a machine from system
"""
state = self.service().getMachineState(self._vmid)
state = self.service().get_machine_state(self._vmid)
if state == 'unknown':
raise Exception('Machine not found')
if state != 'down':
self.__pushFrontOp(opStop)
self.__executeQueue()
self._push_front_op(Operation.STOP)
self._execute_queue()
else:
self.service().removeMachine(self._vmid)
return State.RUNNING
def __startMachine(self) -> str:
def _start_machine(self) -> str:
"""
Powers on the machine
"""
state = self.service().getMachineState(self._vmid)
state = self.service().get_machine_state(self._vmid)
if state == 'unknown':
raise Exception('Machine not found')
@@ -448,18 +445,18 @@ if sys.platform == 'win32':
return State.RUNNING
if state not in ('down', 'suspended'):
self.__pushFrontOp(
opRetry
self._push_front_op(
Operation.RETRY
) # Will call "check Retry", that will finish inmediatly and again call this one
self.service().startMachine(self._vmid)
return State.RUNNING
def __stopMachine(self) -> str:
def _stop_machine(self) -> str:
"""
Powers off the machine
"""
state = self.service().getMachineState(self._vmid)
state = self.service().get_machine_state(self._vmid)
if state == 'unknown':
raise Exception('Machine not found')
@@ -468,19 +465,19 @@ if sys.platform == 'win32':
return State.RUNNING
if state not in ('up', 'suspended'):
self.__pushFrontOp(
opRetry
self._push_front_op(
Operation.RETRY
) # Will call "check Retry", that will finish inmediatly and again call this one
else:
self.service().stopMachine(self._vmid)
return State.RUNNING
def __suspendMachine(self) -> str:
def _suspend_machine(self) -> str:
"""
Suspends the machine
"""
state = self.service().getMachineState(self._vmid)
state = self.service().get_machine_state(self._vmid)
if state == 'unknown':
raise Exception('Machine not found')
@@ -489,15 +486,15 @@ if sys.platform == 'win32':
return State.RUNNING
if state != 'up':
self.__pushFrontOp(
opRetry
self._push_front_op(
Operation.RETRY
) # Remember here, the return State.FINISH will make this retry be "poped" right ar return
else:
self.service().suspendMachine(self._vmid)
self.service().suspend_machine(self._vmid)
return State.RUNNING
def __changeMac(self) -> str:
def _change_mac(self) -> str:
"""
Changes the mac of the first nic
"""
@@ -508,37 +505,37 @@ if sys.platform == 'win32':
return State.RUNNING
# Check methods
def __checkCreate(self) -> str:
def _create_checker(self) -> str:
"""
Checks the state of a deploy for an user or cache
"""
return self.__checkMachineState('down')
return self._check_machine_state('down')
def __checkStart(self) -> str:
def _start_checker(self) -> str:
"""
Checks if machine has started
"""
return self.__checkMachineState(UP_STATES)
return self._check_machine_state(UP_STATES)
def __checkStop(self) -> str:
def _stop_checker(self) -> str:
"""
Checks if machine has stoped
"""
return self.__checkMachineState('down')
return self._check_machine_state('down')
def __checkSuspend(self) -> str:
def _suspend_checker(self) -> str:
"""
Check if the machine has suspended
"""
return self.__checkMachineState('suspended')
return self._check_machine_state('suspended')
def __checkRemoved(self) -> str:
def _remove_checker(self) -> str:
"""
Checks if a machine has been removed
"""
return self.__checkMachineState('unknown')
return self._check_machine_state('unknown')
def __checkMac(self) -> str:
def _mac_checker(self) -> str:
"""
Checks if change mac operation has finished.
@@ -550,58 +547,54 @@ if sys.platform == 'win32':
"""
Check what operation is going on, and acts acordly to it
"""
self.__debug('check_state')
op = self.__getCurrentOp()
self._debug('check_state')
op = self._get_current_op()
if op == opError:
if op == Operation.ERROR:
return State.ERROR
if op == opFinish:
if op == Operation.FINISH:
return State.FINISHED
fncs = {
opCreate: self.__checkCreate,
opRetry: self.__retry,
opWait: self.__wait,
opStart: self.__checkStart,
opStop: self.__checkStop,
opSuspend: self.__checkSuspend,
opRemove: self.__checkRemoved,
opChangeMac: self.__checkMac,
fncs: dict[Operation, typing.Optional[collections.abc.Callable[[], str]]] = {
Operation.CREATE: self._create_checker,
Operation.RETRY: self._retry,
Operation.WAIT: self._wait,
Operation.START: self._start_checker,
Operation.STOP: self._stop_checker,
Operation.SUSPEND: self._suspend_checker,
Operation.REMOVE: self._remove_checker,
Operation.CHANGEMAC: self._mac_checker,
}
try:
chkFnc: typing.Optional[
typing.Optional[collections.abc.Callable[[], str]]
] = fncs.get(op, None)
operation_checker: typing.Optional[typing.Optional[collections.abc.Callable[[], str]]] = fncs.get(op, None)
if chkFnc is None:
return self.__error(
f'Unknown operation found at check queue ({op})'
)
if operation_checker is None:
return self._error(f'Unknown operation found at check queue ({op})')
state = chkFnc()
state = operation_checker()
if state == State.FINISHED:
self.__popCurrentOp() # Remove runing op
return self.__executeQueue()
self._pop_current_op() # Remove runing op
return self._execute_queue()
return state
except Exception as e:
return self.__error(e)
return self._error(e)
def move_to_cache(self, newLevel: int) -> str:
"""
Moves machines between cache levels
"""
if opRemove in self._queue:
if Operation.REMOVE in self._queue:
return State.RUNNING
if newLevel == self.L1_CACHE:
self._queue = [opStart, opFinish]
self._queue = [Operation.START, Operation.FINISH]
else:
self._queue = [opStart, opSuspend, opFinish]
self._queue = [Operation.START, Operation.SUSPEND, Operation.FINISH]
return self.__executeQueue()
return self._execute_queue()
def error_reason(self) -> str:
"""
@@ -617,7 +610,7 @@ if sys.platform == 'win32':
"""
Invoked for destroying a deployed service
"""
self.__debug('destroy')
self._debug('destroy')
if self._vmid == '':
self._queue = []
self._reason = "canceled"
@@ -625,16 +618,16 @@ if sys.platform == 'win32':
# If executing something, wait until finished to remove it
# We simply replace the execution queue
op = self.__getCurrentOp()
op = self._get_current_op()
if op == opError:
return self.__error('Machine is already in error state!')
if op == Operation.ERROR:
return self._error('Machine is already in error state!')
if op in (opFinish, opWait):
self._queue = [opStop, opRemove, opFinish]
return self.__executeQueue()
if op in (Operation.FINISH, Operation.WAIT):
self._queue = [Operation.STOP, Operation.REMOVE, Operation.FINISH]
return self._execute_queue()
self._queue = [op, opStop, opRemove, opFinish]
self._queue = [op, Operation.STOP, Operation.REMOVE, Operation.FINISH]
# Do not execute anything.here, just continue normally
return State.RUNNING
@@ -651,21 +644,21 @@ if sys.platform == 'win32':
return self.destroy()
@staticmethod
def __op2str(op: int) -> str:
def _op2str(op: Operation) -> str:
return {
opCreate: 'create',
opStart: 'start',
opStop: 'stop',
opSuspend: 'suspend',
opRemove: 'remove',
opWait: 'wait',
opError: 'error',
opFinish: 'finish',
opRetry: 'retry',
opChangeMac: 'changing mac',
Operation.CREATE: 'create',
Operation.START: 'start',
Operation.STOP: 'stop',
Operation.SUSPEND: 'suspend',
Operation.REMOVE: 'remove',
Operation.WAIT: 'wait',
Operation.ERROR: 'error',
Operation.FINISH: 'finish',
Operation.RETRY: 'retry',
Operation.CHANGEMAC: 'changing mac',
}.get(op, '????')
def __debug(self, txt):
def _debug(self, txt: str) -> None:
logger.debug(
'State at %s: name: %s, ip: %s, mac: %s, vmid:%s, queue: %s',
txt,
@@ -673,5 +666,5 @@ if sys.platform == 'win32':
self._ip,
self._mac,
self._vmid,
[OVirtLinkedDeployment.__op2str(op) for op in self._queue],
[OVirtLinkedDeployment._op2str(op) for op in self._queue],
)

View File

@@ -1,7 +1,7 @@
# -*- coding: utf-8 -*-
#
# Copyright (c) 2012-2019 Virtual Cable S.L.
# Copyright (c) 2012-2024 Virtual Cable S.L.U.
# All rights reserved.
#
# Redistribution and use in source and binary forms, with or without modification,
@@ -30,14 +30,17 @@
"""
Author: Adolfo Gómez, dkmaster at dkmon dot com
"""
from datetime import datetime
import logging
import typing
import collections.abc
import logging
from re import T
import typing
from datetime import datetime
from django.utils.translation import gettext as _
from uds.core.services import Publication
from uds.core.types.states import State
from uds.core.util import autoserializable
# Not imported at runtime, just for type checking
if typing.TYPE_CHECKING:
@@ -46,85 +49,61 @@ if typing.TYPE_CHECKING:
logger = logging.getLogger(__name__)
class OVirtPublication(Publication):
class OVirtPublication(Publication, autoserializable.AutoSerializable):
"""
This class provides the publication of a oVirtLinkedService
"""
suggested_delay = (
20 # : Suggested recheck time if publication is unfinished in seconds
)
_name: str
_reason: str
_destroyAfter: str
_templateId: str
_state: str
suggested_delay = 20 # : Suggested recheck time if publication is unfinished in seconds
_name = autoserializable.StringField(default='')
_reason = autoserializable.StringField(default='')
_destroy_after = autoserializable.BoolField(default=False)
_template_id = autoserializable.StringField(default='')
_state = autoserializable.StringField(default='r')
def service(self) -> 'OVirtLinkedService':
return typing.cast('OVirtLinkedService', super().service())
def initialize(self) -> None:
"""
This method will be invoked by default __init__ of base class, so it gives
us the oportunity to initialize whataver we need here.
In our case, we setup a few attributes..
"""
# We do not check anything at marshal method, so we ensure that
# default values are correctly handled by marshal.
self._name = ''
self._reason = ''
self._destroyAfter = 'f'
self._templateId = ''
self._state = 'r'
def marshal(self) -> bytes:
"""
returns data from an instance of Sample Publication serialized
"""
return '\t'.join(
[
'v1',
self._name,
self._reason,
self._destroyAfter,
self._templateId,
self._state,
]
).encode('utf8')
def unmarshal(self, data: bytes) -> None:
"""
deserializes the data and loads it inside instance.
"""
if not data.startswith(b'v'):
return super().unmarshal(data)
logger.debug('Data: %s', data)
vals = data.decode('utf8').split('\t')
if vals[0] == 'v1':
(
self._name,
self._reason,
self._destroyAfter,
self._templateId,
destroy_after,
self._template_id,
self._state,
) = vals[1:]
else:
raise ValueError('Invalid data format')
self._destroy_after = destroy_after == 't'
self.mark_for_upgrade() # Mark so manager knows it has to be saved again
def publish(self) -> str:
"""
Realizes the publication of the service
"""
self._name = self.service().sanitizeVmName(
self._name = self.service().sanitized_name(
'UDSP ' + self.servicepool_name() + "-" + str(self.revision())
)
comments = _('UDS pub for {0} at {1}').format(
self.servicepool_name(), str(datetime.now()).split('.')[0]
)
self._reason = '' # No error, no reason for it
self._destroyAfter = 'f'
self._destroy_after = False
self._state = 'locked'
try:
self._templateId = self.service().makeTemplate(self._name, comments)
self._template_id = self.service().make_template(self._name, comments)
except Exception as e:
self._state = 'error'
self._reason = str(e)
@@ -143,7 +122,7 @@ class OVirtPublication(Publication):
return State.ERROR
try:
self._state = self.service().getTemplateState(self._templateId)
self._state = self.service().get_template_state(self._template_id)
if self._state == 'removed':
raise Exception('Template has been removed!')
except Exception as e:
@@ -153,7 +132,8 @@ class OVirtPublication(Publication):
# If publication os done (template is ready), and cancel was requested, do it just after template becomes ready
if self._state == 'ok':
if self._destroyAfter == 't':
if self._destroy_after:
self._destroy_after = False
return self.destroy()
return State.FINISHED
@@ -182,11 +162,11 @@ class OVirtPublication(Publication):
"""
# We do not do anything else to destroy this instance of publication
if self._state == 'locked':
self._destroyAfter = 't'
self._destroy_after = True
return State.RUNNING
try:
self.service().removeTemplate(self._templateId)
self.service().removeTemplate(self._template_id)
except Exception as e:
self._state = 'error'
self._reason = str(e)
@@ -204,8 +184,8 @@ class OVirtPublication(Publication):
# Methods provided below are specific for this publication
# and will be used by user deployments that uses this kind of publication
def getTemplateId(self) -> str:
def get_template_id(self) -> str:
"""
Returns the template id associated with the publication
"""
return self._templateId
return self._template_id

View File

@@ -282,13 +282,13 @@ class OVirtLinkedService(services.Service): # pylint: disable=too-many-public-m
)
)
def sanitizeVmName(self, name: str) -> str:
def sanitized_name(self, name: str) -> str:
"""
Ovirt only allows machine names with [a-zA-Z0-9_-]
"""
return re.sub("[^a-zA-Z0-9_-]", "_", name)
def makeTemplate(self, name: str, comments: str) -> str:
def make_template(self, name: str, comments: str) -> str:
"""
Invokes makeTemplate from parent provider, completing params
@@ -315,7 +315,7 @@ class OVirtLinkedService(services.Service): # pylint: disable=too-many-public-m
self.display.value,
)
def getTemplateState(self, templateId: str) -> str:
def get_template_state(self, templateId: str) -> str:
"""
Invokes getTemplateState from parent provider
@@ -328,7 +328,7 @@ class OVirtLinkedService(services.Service): # pylint: disable=too-many-public-m
"""
return self.parent().getTemplateState(templateId)
def deployFromTemplate(self, name: str, comments: str, templateId: str) -> str:
def deploy_from_template(self, name: str, comments: str, templateId: str) -> str:
"""
Deploys a virtual machine on selected cluster from selected template
@@ -362,7 +362,7 @@ class OVirtLinkedService(services.Service): # pylint: disable=too-many-public-m
"""
self.parent().removeTemplate(templateId)
def getMachineState(self, machineId: str) -> str:
def get_machine_state(self, machineId: str) -> str:
"""
Invokes getMachineState from parent provider
(returns if machine is "active" or "inactive"
@@ -404,7 +404,7 @@ class OVirtLinkedService(services.Service): # pylint: disable=too-many-public-m
"""
self.parent().stopMachine(machineId)
def suspendMachine(self, machineId: str) -> None:
def suspend_machine(self, machineId: str) -> None:
"""
Tries to start a machine. No check is done, it is simply requested to oVirt

View File

@@ -46,7 +46,7 @@ from uds.core.util.model import sql_stamp_seconds
if typing.TYPE_CHECKING:
from uds import models
from .service import OGService
from .publication import OGPublication
from .publication import OpenGnsysPublication
from uds.core.util.storage import Storage
logger = logging.getLogger(__name__)
@@ -70,7 +70,7 @@ class Operation(enum.IntEnum):
return Operation.UNKNOWN
class OGDeployment(services.UserService, autoserializable.AutoSerializable):
class OpenGnsysUserService(services.UserService, autoserializable.AutoSerializable):
"""
This class generates the user consumable elements of the service tree.
@@ -109,7 +109,7 @@ class OGDeployment(services.UserService, autoserializable.AutoSerializable):
def service(self) -> 'OGService':
return typing.cast('OGService', super().service())
def publication(self) -> 'OGPublication':
def publication(self) -> 'OpenGnsysPublication':
pub = super().publication()
if pub is None:
raise Exception('No publication for this element!')
@@ -134,7 +134,7 @@ class OGDeployment(services.UserService, autoserializable.AutoSerializable):
Operation.from_int(i) for i in pickle.loads(vals[7])
] # nosec: not insecure, we are loading our own data
self.flag_for_upgrade() # Flag so manager can save it again with new format
self.mark_for_upgrade() # Flag so manager can save it again with new format
def get_name(self) -> str:
return self._name
@@ -450,5 +450,5 @@ class OGDeployment(services.UserService, autoserializable.AutoSerializable):
self._ip,
self._mac,
self._machine_id,
[OGDeployment.__op2str(op) for op in self._queue],
[OpenGnsysUserService.__op2str(op) for op in self._queue],
)

View File

@@ -45,7 +45,7 @@ if typing.TYPE_CHECKING:
logger = logging.getLogger(__name__)
class OGPublication(Publication, autoserializable.AutoSerializable):
class OpenGnsysPublication(Publication, autoserializable.AutoSerializable):
"""
This class provides the publication of a oVirtLinkedService
"""

View File

@@ -39,8 +39,8 @@ from uds.core import types, services, consts
from uds.core.ui import gui
from . import helpers
from .deployment import OGDeployment
from .publication import OGPublication
from .deployment import OpenGnsysUserService
from .publication import OpenGnsysPublication
# Not imported at runtime, just for type checking
if typing.TYPE_CHECKING:
@@ -87,9 +87,9 @@ class OGService(services.Service):
# : Types of publications (preparated data for deploys)
# : In our case, we do no need a publication, so this is None
publication_type = OGPublication
publication_type = OpenGnsysPublication
# : Types of deploys (services in cache and/or assigned to users)
user_service_type = OGDeployment
user_service_type = OpenGnsysUserService
allowed_protocols = types.transports.Protocol.generic_vdi()
services_type_provided = types.services.ServiceType.VDI

View File

@@ -114,7 +114,7 @@ class OpenNebulaLiveDeployment(services.UserService, autoserializable.AutoSerial
self._reason = vals[5].decode('utf8')
self._queue = [Operation.from_int(i) for i in pickle.loads(vals[6])] # nosec
self.flag_for_upgrade() # Flag so manager can save it again with new format
self.mark_for_upgrade() # Flag so manager can save it again with new format
def get_name(self) -> str:
if self._name == '':
@@ -162,8 +162,8 @@ class OpenNebulaLiveDeployment(services.UserService, autoserializable.AutoSerial
def get_console_connection(self) -> dict[str, typing.Any]:
return self.service().getConsoleConnection(self._vmid)
def desktopLogin(self, username: str, password: str, domain: str = ''):
return self.service().desktopLogin(self._vmid, username, password, domain)
def desktop_login(self, username: str, password: str, domain: str = ''):
return self.service().desktop_login(self._vmid, username, password, domain)
def process_ready_from_os_manager(self, data: typing.Any) -> str:
# Here we will check for suspending the VM (when full ready)

View File

@@ -299,7 +299,7 @@ class OpenNebulaProvider(ServiceProvider): # pylint: disable=too-many-public-me
'ticket': {'value': display['passwd'], 'expiry': ''},
}
def desktopLogin(self, machineId: str, username: str, password: str, domain: str) -> dict[str, typing.Any]:
def desktop_login(self, machineId: str, username: str, password: str, domain: str) -> dict[str, typing.Any]:
'''
Not provided by OpenNebula API right now
'''

View File

@@ -79,7 +79,7 @@ class OpenNebulaLivePublication(Publication, autoserializable.AutoSerializable):
self._destroy_after = False
self.flag_for_upgrade() # Flag so manager can save it again with new format
self.mark_for_upgrade() # Flag so manager can save it again with new format
def publish(self) -> str:
"""

View File

@@ -312,10 +312,10 @@ class OpenNebulaLiveService(services.Service):
def getConsoleConnection(self, machineId: str) -> dict[str, typing.Any]:
return self.parent().getConsoleConnection(machineId)
def desktopLogin(
def desktop_login(
self, machineId: str, username: str, password: str, domain: str
) -> dict[str, typing.Any]:
return self.parent().desktopLogin(machineId, username, password, domain)
return self.parent().desktop_login(machineId, username, password, domain)
def is_avaliable(self) -> bool:
return self.parent().is_available()

View File

@@ -124,7 +124,7 @@ class OpenStackLiveDeployment(
self._reason = vals[5].decode('utf8')
self._queue = [Operation.from_int(i) for i in pickle.loads(vals[6])] # nosec
self.flag_for_upgrade() # Flag so manager can save it again with new format
self.mark_for_upgrade() # Flag so manager can save it again with new format
def get_name(self) -> str:
if self._name == '':

View File

@@ -82,7 +82,7 @@ class OpenStackLivePublication(Publication, autoserializable.AutoSerializable):
self._destroy_after = destroy_after == 'y'
self.flag_for_upgrade() # This will force remarshalling
self.mark_for_upgrade() # This will force remarshalling
def publish(self) -> str:
"""

View File

@@ -236,7 +236,7 @@ class IPMachinesService(IPServiceBase):
# Sets maximum services for this, and loads "hosts" into cache
self.userservices_limit = len(self.hosts)
self.flag_for_upgrade() # Flag for upgrade as soon as possible
self.mark_for_upgrade() # Flag for upgrade as soon as possible
def is_usable(self, locked: typing.Optional[typing.Union[str, int]], now: int) -> int:
# If _maxSessionForMachine is 0, it can be used only if not locked

View File

@@ -165,11 +165,11 @@ class TaskStatus:
def is_finished(self) -> bool:
return self.status == 'stopped'
def isCompleted(self) -> bool:
def is_completed(self) -> bool:
return self.is_finished() and self.exitstatus == 'OK'
def is_errored(self) -> bool:
return self.is_finished() and not self.isCompleted()
return self.is_finished() and not self.is_completed()
@dataclasses.dataclass(slots=True)

View File

@@ -31,6 +31,7 @@
Author: Adolfo Gómez, dkmaster at dkmon dot com
"""
import pickle # nosec: controled data
import enum
import logging
import typing
import collections.abc
@@ -38,7 +39,7 @@ import collections.abc
from uds.core import services, consts
from uds.core.managers.user_service import UserServiceManager
from uds.core.types.states import State
from uds.core.util import log
from uds.core.util import log, autoserializable
from uds.core.util.model import sql_stamp_seconds
from .jobs import ProxmoxDeferredRemoval
@@ -53,25 +54,44 @@ if typing.TYPE_CHECKING:
logger = logging.getLogger(__name__)
(
opCreate,
opStart,
opStop,
opShutdown,
opRemove,
opWait,
opError,
opFinish,
opRetry,
opGetMac,
opGracelyStop,
) = range(11)
UP_STATES = ('up', 'reboot_in_progress', 'powering_up', 'restoring_state')
class Operation(enum.IntEnum):
"""
Operation codes for Proxmox deployment
"""
CREATE = 0
START = 1
STOP = 2
SHUTDOWN = 3
REMOVE = 4
WAIT = 5
ERROR = 6
FINISH = 7
RETRY = 8
GET_MAC = 9
GRACEFUL_STOP = 10
opUnknown = 99
@staticmethod
def from_int(value: int) -> 'Operation':
try:
return Operation(value)
except ValueError:
return Operation.opUnknown
# The difference between "SHUTDOWN" and "GRACEFUL_STOP" is that the first one
# is used to "best try to stop" the machine to move to L2 (that is, if it cannot be stopped,
# it will be moved to L2 anyway, but keeps running), and the second one is used to "best try to stop"
# the machine when destoying it (that is, if it cannot be stopped, it will be destroyed anyway after a
# timeout of at most GUEST_SHUTDOWN_WAIT seconds)
# UP_STATES = ('up', 'reboot_in_progress', 'powering_up', 'restoring_state')
GUEST_SHUTDOWN_WAIT = 90 # Seconds
class ProxmoxDeployment(services.UserService):
class ProxmoxDeployment(services.UserService, autoserializable.AutoSerializable):
"""
This class generates the user consumable elements of the service tree.
@@ -86,14 +106,22 @@ class ProxmoxDeployment(services.UserService):
# : Recheck every this seconds by default (for task methods)
suggested_delay = 12
_name = autoserializable.StringField(default='')
_ip = autoserializable.StringField(default='')
_mac = autoserializable.StringField(default='')
_task = autoserializable.StringField(default='')
_vmid = autoserializable.StringField(default='')
_reason = autoserializable.StringField(default='')
_queue = autoserializable.ListField[Operation]()
# own vars
_name: str
_ip: str
_mac: str
_task: str
_vmid: str
_reason: str
_queue: list[int]
# _name: str
# _ip: str
# _mac: str
# _task: str
# _vmid: str
# _reason: str
# _queue: list[int]
# Utility overrides for type checking...
def service(self) -> 'ProxmoxLinkedService':
@@ -105,37 +133,13 @@ class ProxmoxDeployment(services.UserService):
raise Exception('No publication for this element!')
return typing.cast('ProxmoxPublication', pub)
def initialize(self):
self._name = ''
self._ip = ''
self._mac = ''
self._task = ''
self._vmid = ''
self._reason = ''
self._queue = []
# Serializable needed methods
def marshal(self) -> bytes:
"""
Does nothing right here, we will use environment storage in this sample
"""
return b'\1'.join(
[
b'v1',
self._name.encode('utf8'),
self._ip.encode('utf8'),
self._mac.encode('utf8'),
self._task.encode('utf8'),
self._vmid.encode('utf8'),
self._reason.encode('utf8'),
pickle.dumps(self._queue, protocol=0),
]
)
def unmarshal(self, data: bytes) -> None:
"""
Does nothing here also, all data are keeped at environment storage
"""
if not data.startswith(b'v'):
return super().unmarshal(data)
vals = data.split(b'\1')
if vals[0] == b'v1':
self._name = vals[1].decode('utf8')
@@ -144,7 +148,9 @@ class ProxmoxDeployment(services.UserService):
self._task = vals[4].decode('utf8')
self._vmid = vals[5].decode('utf8')
self._reason = vals[6].decode('utf8')
self._queue = pickle.loads(vals[7]) # nosec: controled data
self._queue = [Operation.from_int(i) for i in pickle.loads(vals[7])] # nosec: controled data
self.mark_for_upgrade() # Flag so manager can save it again with new format
def get_name(self) -> str:
if self._name == '':
@@ -186,11 +192,11 @@ class ProxmoxDeployment(services.UserService):
except client.ProxmoxConnectionError:
raise # If connection fails, let it fail on parent
except Exception as e:
return self.__error(f'Machine not found: {e}')
return self._error(f'Machine not found: {e}')
if vmInfo.status == 'stopped':
self._queue = [opStart, opFinish]
return self.__executeQueue()
self._queue = [Operation.START, Operation.FINISH]
return self._execute_queue()
self.cache.put('ready', '1')
return State.FINISHED
@@ -210,7 +216,7 @@ class ProxmoxDeployment(services.UserService):
) -> typing.Optional[collections.abc.MutableMapping[str, typing.Any]]:
return self.service().get_console_connection(self._vmid)
def desktopLogin(
def desktop_login(
self,
username: str,
password: str,
@@ -233,10 +239,10 @@ if sys.platform == 'win32':
def process_ready_from_os_manager(self, data: typing.Any) -> str:
# Here we will check for suspending the VM (when full ready)
logger.debug('Checking if cache 2 for %s', self._name)
if self.__getCurrentOp() == opWait:
if self._get_current_op() == Operation.WAIT:
logger.debug('Machine is ready. Moving to level 2')
self.__popCurrentOp() # Remove current state
return self.__executeQueue()
self._pop_current_op() # Remove current state
return self._execute_queue()
# Do not need to go to level 2 (opWait is in fact "waiting for moving machine to cache level 2)
return State.FINISHED
@@ -245,50 +251,57 @@ if sys.platform == 'win32':
Deploys an service instance for an user.
"""
logger.debug('Deploying for user')
self.__initQueueForDeploy(False)
return self.__executeQueue()
self._init_queue_for_deploy(False)
return self._execute_queue()
def deploy_for_cache(self, cacheLevel: int) -> str:
"""
Deploys an service instance for cache
"""
self.__initQueueForDeploy(cacheLevel == self.L2_CACHE)
return self.__executeQueue()
self._init_queue_for_deploy(cacheLevel == self.L2_CACHE)
return self._execute_queue()
def __initQueueForDeploy(self, forLevel2: bool = False) -> None:
def _init_queue_for_deploy(self, forLevel2: bool = False) -> None:
if forLevel2 is False:
self._queue = [opCreate, opGetMac, opStart, opFinish]
self._queue = [Operation.CREATE, Operation.GET_MAC, Operation.START, Operation.FINISH]
else:
self._queue = [opCreate, opGetMac, opStart, opWait, opShutdown, opFinish]
self._queue = [
Operation.CREATE,
Operation.GET_MAC,
Operation.START,
Operation.WAIT,
Operation.SHUTDOWN,
Operation.FINISH,
]
def __setTask(self, upid: 'client.types.UPID'):
def _store_task(self, upid: 'client.types.UPID'):
self._task = ','.join([upid.node, upid.upid])
def __getTask(self) -> tuple[str, str]:
def _retrieve_task(self) -> tuple[str, str]:
vals = self._task.split(',')
return (vals[0], vals[1])
def __getCurrentOp(self) -> int:
def _get_current_op(self) -> Operation:
if not self._queue:
return opFinish
return Operation.FINISH
return self._queue[0]
def __popCurrentOp(self) -> int:
def _pop_current_op(self) -> Operation:
if not self._queue:
return opFinish
return Operation.FINISH
res = self._queue.pop(0)
return res
def __pushFrontOp(self, op: int):
def _push_front_op(self, op: Operation) -> None:
self._queue.insert(0, op)
def __retryLater(self) -> str:
self.__pushFrontOp(opRetry)
def _retry_later(self) -> str:
self._push_front_op(Operation.RETRY)
return State.RUNNING
def __error(self, reason: typing.Union[str, Exception]) -> str:
def _error(self, reason: typing.Union[str, Exception]) -> str:
"""
Internal method to set object as error state
@@ -302,48 +315,46 @@ if sys.platform == 'win32':
if self._vmid != '': # Powers off
ProxmoxDeferredRemoval.remove(self.service().parent(), int(self._vmid))
self._queue = [opError]
self._queue = [Operation.ERROR]
self._reason = reason
return State.ERROR
def __executeQueue(self) -> str:
def _execute_queue(self) -> str:
self.__debug('executeQueue')
op = self.__getCurrentOp()
op = self._get_current_op()
if op == opError:
if op == Operation.ERROR:
return State.ERROR
if op == opFinish:
if op == Operation.FINISH:
return State.FINISHED
fncs: collections.abc.Mapping[int, typing.Optional[collections.abc.Callable[[], str]]] = {
opCreate: self.__create,
opRetry: self.__retry,
opStart: self.__startMachine,
opStop: self.__stopMachine,
opGracelyStop: self.__gracelyStop,
opShutdown: self.__shutdownMachine,
opWait: self.__wait,
opRemove: self.__remove,
opGetMac: self.__updateVmMacAndHA,
fncs: collections.abc.Mapping[Operation, typing.Optional[collections.abc.Callable[[], str]]] = {
Operation.CREATE: self._create,
Operation.RETRY: self._retry,
Operation.START: self._start_machine,
Operation.STOP: self._stop_machine,
Operation.GRACEFUL_STOP: self._gracely_stop,
Operation.SHUTDOWN: self._shutdown_machine,
Operation.WAIT: self._wait,
Operation.REMOVE: self._remove,
Operation.GET_MAC: self._update_machine_mac_and_ha,
}
try:
execFnc: typing.Optional[collections.abc.Callable[[], str]] = fncs.get(op, None)
operation_executor: typing.Optional[collections.abc.Callable[[], str]] = fncs.get(op, None)
if execFnc is None:
return self.__error(
f'Unknown operation found at execution queue ({op})'
)
if operation_executor is None:
return self._error(f'Unknown operation found at execution queue ({op})')
execFnc()
operation_executor()
return State.RUNNING
except Exception as e:
return self.__error(e)
return self._error(e)
# Queue execution methods
def __retry(self) -> str:
def _retry(self) -> str:
"""
Used to retry an operation
In fact, this will not be never invoked, unless we push it twice, because
@@ -353,17 +364,17 @@ if sys.platform == 'win32':
"""
return State.FINISHED
def __wait(self) -> str:
def _wait(self) -> str:
"""
Executes opWait, it simply waits something "external" to end
"""
return State.RUNNING
def __create(self) -> str:
def _create(self) -> str:
"""
Deploys a machine from template for user/cache
"""
templateId = self.publication().machine()
template_id = self.publication().machine()
name = self.get_name()
if name == consts.NO_MORE_NAMES:
raise Exception(
@@ -372,15 +383,15 @@ if sys.platform == 'win32':
comments = 'UDS Linked clone'
taskResult = self.service().clone_machine(name, comments, templateId)
task_result = self.service().clone_machine(name, comments, template_id)
self.__setTask(taskResult.upid)
self._store_task(task_result.upid)
self._vmid = str(taskResult.vmid)
self._vmid = str(task_result.vmid)
return State.RUNNING
def __remove(self) -> str:
def _remove(self) -> str:
"""
Removes a machine from system
"""
@@ -391,26 +402,26 @@ if sys.platform == 'win32':
if vmInfo.status != 'stopped':
logger.debug('Info status: %s', vmInfo)
self._queue = [opStop, opRemove, opFinish]
return self.__executeQueue()
self.__setTask(self.service().remove_machine(int(self._vmid)))
self._queue = [Operation.STOP, Operation.REMOVE, Operation.FINISH]
return self._execute_queue()
self._store_task(self.service().remove_machine(int(self._vmid)))
return State.RUNNING
def __startMachine(self) -> str:
def _start_machine(self) -> str:
try:
vmInfo = self.service().get_machine_info(int(self._vmid))
except client.ProxmoxConnectionError:
return self.__retryLater()
return self._retry_later()
except Exception as e:
raise Exception('Machine not found on start machine') from e
if vmInfo.status == 'stopped':
self.__setTask(self.service().start_machine(int(self._vmid)))
self._store_task(self.service().start_machine(int(self._vmid)))
return State.RUNNING
def __stopMachine(self) -> str:
def _stop_machine(self) -> str:
try:
vmInfo = self.service().get_machine_info(int(self._vmid))
except Exception as e:
@@ -418,44 +429,42 @@ if sys.platform == 'win32':
if vmInfo.status != 'stopped':
logger.debug('Stopping machine %s', vmInfo)
self.__setTask(self.service().stop_machine(int(self._vmid)))
self._store_task(self.service().stop_machine(int(self._vmid)))
return State.RUNNING
def __shutdownMachine(self) -> str:
def _shutdown_machine(self) -> str:
try:
vmInfo = self.service().get_machine_info(int(self._vmid))
except client.ProxmoxConnectionError:
return State.RUNNING # Try again later
except Exception as e:
raise Exception('Machine not found on suspend machine') from e
raise Exception('Machine not found or suspended machine') from e
if vmInfo.status != 'stopped':
self.__setTask(self.service().shutdown_machine(int(self._vmid)))
self._store_task(self.service().shutdown_machine(int(self._vmid)))
return State.RUNNING
def __gracelyStop(self) -> str:
def _gracely_stop(self) -> str:
"""
Tries to stop machine using vmware tools
If it takes too long to stop, or vmware tools are not installed,
Tries to stop machine using qemu guest tools
If it takes too long to stop, or qemu guest tools are not installed,
will use "power off" "a las bravas"
"""
self._task = ''
shutdown = -1 # Means machine already stopped
vmInfo = self.service().get_machine_info(int(self._vmid))
if vmInfo.status != 'stopped':
self.__setTask(self.service().shutdown_machine(int(self._vmid)))
self._store_task(self.service().shutdown_machine(int(self._vmid)))
shutdown = sql_stamp_seconds()
logger.debug('Stoped vm using guest tools')
self.storage.put_pickle('shutdown', shutdown)
return State.RUNNING
def __updateVmMacAndHA(self) -> str:
def _update_machine_mac_and_ha(self) -> str:
try:
self.service().enable_ha(
int(self._vmid), True
) # Enable HA before continuing here
self.service().enable_ha(int(self._vmid), True) # Enable HA before continuing here
# Set vm mac address now on first interface
self.service().set_machine_mac(int(self._vmid), self.get_unique_id())
@@ -465,11 +474,11 @@ if sys.platform == 'win32':
return State.RUNNING
# Check methods
def __checkTaskFinished(self):
def _check_task_finished(self) -> str:
if self._task == '':
return State.FINISHED
node, upid = self.__getTask()
node, upid = self._retrieve_task()
try:
task = self.service().get_task_info(node, upid)
@@ -477,38 +486,38 @@ if sys.platform == 'win32':
return State.RUNNING # Try again later
if task.is_errored():
return self.__error(task.exitstatus)
return self._error(task.exitstatus)
if task.isCompleted():
if task.is_completed():
return State.FINISHED
return State.RUNNING
def __checkCreate(self) -> str:
def _create_checker(self) -> str:
"""
Checks the state of a deploy for an user or cache
"""
return self.__checkTaskFinished()
return self._check_task_finished()
def __checkStart(self) -> str:
def _start_checker(self) -> str:
"""
Checks if machine has started
"""
return self.__checkTaskFinished()
return self._check_task_finished()
def __checkStop(self) -> str:
def _stop_checker(self) -> str:
"""
Checks if machine has stoped
"""
return self.__checkTaskFinished()
return self._check_task_finished()
def __checkShutdown(self) -> str:
def _shutdown_checker(self) -> str:
"""
Check if the machine has suspended
"""
return self.__checkTaskFinished()
return self._check_task_finished()
def __checkGracelyStop(self) -> str:
def _graceful_stop_checker(self) -> str:
"""
Check if the machine has gracely stopped (timed shutdown)
"""
@@ -521,7 +530,7 @@ if sys.platform == 'win32':
if shutdown_start == 0: # Was shut down a las bravas
logger.debug('Macine DO NOT HAVE guest tools')
return self.__checkStop()
return self._stop_checker()
logger.debug('Checking State')
# Check if machine is already stopped
@@ -537,21 +546,21 @@ if sys.platform == 'win32':
)
# Not stopped by guest in time, but must be stopped normally
self.storage.put_pickle('shutdown', 0)
return self.__stopMachine() # Launch "hard" stop
return self._stop_machine() # Launch "hard" stop
return State.RUNNING
def __checkRemoved(self) -> str:
def _remove_checker(self) -> str:
"""
Checks if a machine has been removed
"""
return self.__checkTaskFinished()
return self._check_task_finished()
def __checkMac(self) -> str:
def _mac_checker(self) -> str:
"""
Checks if change mac operation has finished.
Changing nic configuration es 1-step operation, so when we check it here, it is already done
Changing nic configuration is 1-step operation, so when we check it here, it is already done
"""
return State.FINISHED
@@ -560,58 +569,56 @@ if sys.platform == 'win32':
Check what operation is going on, and acts acordly to it
"""
self.__debug('check_state')
op = self.__getCurrentOp()
op = self._get_current_op()
if op == opError:
if op == Operation.ERROR:
return State.ERROR
if op == opFinish:
if op == Operation.FINISH:
return State.FINISHED
fncs = {
opCreate: self.__checkCreate,
opRetry: self.__retry,
opWait: self.__wait,
opStart: self.__checkStart,
opStop: self.__checkStop,
opGracelyStop: self.__checkGracelyStop,
opShutdown: self.__checkShutdown,
opRemove: self.__checkRemoved,
opGetMac: self.__checkMac,
fncs: dict[Operation, typing.Optional[collections.abc.Callable[[], str]]] = {
Operation.CREATE: self._create_checker,
Operation.RETRY: self._retry,
Operation.WAIT: self._wait,
Operation.START: self._start_checker,
Operation.STOP: self._stop_checker,
Operation.GRACEFUL_STOP: self._graceful_stop_checker,
Operation.SHUTDOWN: self._shutdown_checker,
Operation.REMOVE: self._remove_checker,
Operation.GET_MAC: self._mac_checker,
}
try:
chkFnc: typing.Optional[
typing.Optional[collections.abc.Callable[[], str]]
] = fncs.get(op, None)
if chkFnc is None:
return self.__error(
f'Unknown operation found at check queue ({op})'
operation_checker: typing.Optional[typing.Optional[collections.abc.Callable[[], str]]] = fncs.get(
op, None
)
state = chkFnc()
if operation_checker is None:
return self._error(f'Unknown operation found at check queue ({op})')
state = operation_checker()
if state == State.FINISHED:
self.__popCurrentOp() # Remove runing op
return self.__executeQueue()
self._pop_current_op() # Remove runing op
return self._execute_queue()
return state
except Exception as e:
return self.__error(e)
return self._error(e)
def move_to_cache(self, newLevel: int) -> str:
"""
Moves machines between cache levels
"""
if opRemove in self._queue:
if Operation.REMOVE in self._queue:
return State.RUNNING
if newLevel == self.L1_CACHE:
self._queue = [opStart, opFinish]
self._queue = [Operation.START, Operation.FINISH]
else:
self._queue = [opStart, opShutdown, opFinish]
self._queue = [Operation.START, Operation.SHUTDOWN, Operation.FINISH]
return self.__executeQueue()
return self._execute_queue()
def error_reason(self) -> str:
"""
@@ -635,17 +642,17 @@ if sys.platform == 'win32':
# If executing something, wait until finished to remove it
# We simply replace the execution queue
op = self.__getCurrentOp()
op = self._get_current_op()
if op == opError:
return self.__error('Machine is already in error state!')
if op == Operation.ERROR:
return self._error('Machine is already in error state!')
lst = [] if not self.service().try_graceful_shutdown() else [opGracelyStop]
queue = lst + [opStop, opRemove, opFinish]
lst: list[Operation] = [] if not self.service().try_graceful_shutdown() else [Operation.GRACEFUL_STOP]
queue = lst + [Operation.STOP, Operation.REMOVE, Operation.FINISH]
if op in (opFinish, opWait):
if op in (Operation.FINISH, Operation.WAIT):
self._queue[:] = queue
return self.__executeQueue()
return self._execute_queue()
self._queue = [op] + queue
# Do not execute anything.here, just continue normally
@@ -664,19 +671,19 @@ if sys.platform == 'win32':
return self.destroy()
@staticmethod
def __op2str(op: int) -> str:
def __op2str(op: Operation) -> str:
return {
opCreate: 'create',
opStart: 'start',
opStop: 'stop',
opShutdown: 'suspend',
opGracelyStop: 'gracely stop',
opRemove: 'remove',
opWait: 'wait',
opError: 'error',
opFinish: 'finish',
opRetry: 'retry',
opGetMac: 'getting mac',
Operation.CREATE: 'create',
Operation.START: 'start',
Operation.STOP: 'stop',
Operation.SHUTDOWN: 'suspend',
Operation.GRACEFUL_STOP: 'gracely stop',
Operation.REMOVE: 'remove',
Operation.WAIT: 'wait',
Operation.ERROR: 'error',
Operation.FINISH: 'finish',
Operation.RETRY: 'retry',
Operation.GET_MAC: 'getting mac',
}.get(op, '????')
def __debug(self, txt):

View File

@@ -36,6 +36,7 @@ import collections.abc
from django.utils.translation import gettext as _
from uds.core import services
from uds.core.util import autoserializable
from uds.core.types.states import State
# Not imported at runtime, just for type checking
@@ -46,52 +47,28 @@ if typing.TYPE_CHECKING:
logger = logging.getLogger(__name__)
class ProxmoxPublication(services.Publication):
class ProxmoxPublication(services.Publication, autoserializable.AutoSerializable):
suggested_delay = 20
_name: str
_vm: str
_task: str
_state: str
_operation: str
_destroy_after_finish: str
_reason: str
def __init__(self, environment, **kwargs):
services.Publication.__init__(self, environment, **kwargs)
self._name = ''
self._vm = ''
self._task = ''
self._state = ''
self._operation = ''
self._destroy_after_finish = ''
self._reason = ''
_name = autoserializable.StringField(default='')
_vm = autoserializable.StringField(default='')
_task = autoserializable.StringField(default='')
_state = autoserializable.StringField(default='')
_operation = autoserializable.StringField(default='')
_destroy_after = autoserializable.BoolField(default=False)
_reason = autoserializable.StringField(default='')
# Utility overrides for type checking...
def service(self) -> 'ProxmoxLinkedService':
return typing.cast('ProxmoxLinkedService', super().service())
def marshal(self) -> bytes:
"""
returns data from an instance of Sample Publication serialized
"""
return '\t'.join(
[
'v1',
self._name,
self._vm,
self._task,
self._state,
self._operation,
self._destroy_after_finish,
self._reason,
]
).encode('utf8')
def unmarshal(self, data: bytes) -> None:
"""
deserializes the data and loads it inside instance.
"""
if not data.startswith(b'v'):
return super().unmarshal(data)
logger.debug('Data: %s', data)
vals = data.decode('utf8').split('\t')
if vals[0] == 'v1':
@@ -101,9 +78,15 @@ class ProxmoxPublication(services.Publication):
self._task,
self._state,
self._operation,
self._destroy_after_finish,
destroy_after,
self._reason,
) = vals[1:]
else:
raise ValueError('Invalid data format')
self._destroy_after = destroy_after != ''
self.mark_for_upgrade() # Flag so manager can save it again with new format
def publish(self) -> str:
"""
@@ -127,7 +110,7 @@ class ProxmoxPublication(services.Publication):
self._task = ','.join((task.upid.node, task.upid.upid))
self._state = State.RUNNING
self._operation = 'p' # Publishing
self._destroy_after_finish = ''
self._destroy_after = False
return State.RUNNING
except Exception as e:
logger.exception('Caught exception %s', e)
@@ -154,7 +137,7 @@ class ProxmoxPublication(services.Publication):
self._reason = task.exitstatus
self._state = State.ERROR
else: # Finished
if self._destroy_after_finish:
if self._destroy_after:
return self.destroy()
self._state = State.FINISHED
if self._operation == 'p': # not Destroying
@@ -177,18 +160,18 @@ class ProxmoxPublication(services.Publication):
def finish(self) -> None:
self._task = ''
self._destroy_after_finish = ''
self._destroy_after = False
def destroy(self) -> str:
if (
self._state == State.RUNNING and self._destroy_after_finish is False
self._state == State.RUNNING and self._destroy_after is False
): # If called destroy twice, will BREAK STOP publication
self._destroy_after_finish = 'y'
self._destroy_after = True
return State.RUNNING
self.state = State.RUNNING
self._operation = 'd'
self._destroy_after_finish = ''
self._destroy_after = False
try:
task = self.service().remove_machine(self.machine())
self._task = ','.join((task.node, task.upid))

View File

@@ -118,7 +118,7 @@ class XenLinkedDeployment(services.UserService, autoserializable.AutoSerializabl
self._queue = pickle.loads(vals[6]) # nosec: not insecure, we are loading our own data
self._task = vals[7].decode('utf8')
self.flag_for_upgrade() # Force upgrade
self.mark_for_upgrade() # Force upgrade
def get_name(self) -> str:
if not self._name:

View File

@@ -33,10 +33,13 @@ from datetime import datetime
import logging
import typing
import collections.abc
from cycler import V
from django.utils.translation import gettext as _
from regex import F
from uds.core.services import Publication
from uds.core.types.states import State
from uds.core.util import autoserializable
# Not imported at runtime, just for type checking
if typing.TYPE_CHECKING:
@@ -45,52 +48,45 @@ if typing.TYPE_CHECKING:
logger = logging.getLogger(__name__)
class XenPublication(Publication):
class XenPublication(Publication, autoserializable.AutoSerializable):
suggested_delay = (
20 # : Suggested recheck time if publication is unfinished in seconds
)
_name: str = ''
_reason: str = ''
_destroyAfter: str = 'f'
_templateId: str = ''
_state: str = ''
_task: str = ''
_name = autoserializable.StringField(default='')
_reason = autoserializable.StringField(default='')
_destroy_after = autoserializable.BoolField(default=False)
_template_id = autoserializable.StringField(default='')
_state = autoserializable.StringField(default='')
_task = autoserializable.StringField(default='')
def service(self) -> 'XenLinkedService':
return typing.cast('XenLinkedService', super().service())
def marshal(self) -> bytes:
"""
returns data from an instance of Sample Publication serialized
"""
return '\t'.join(
[
'v1',
self._name,
self._reason,
self._destroyAfter,
self._templateId,
self._state,
self._task,
]
).encode('utf8')
def unmarshal(self, data: bytes) -> None:
"""
deserializes the data and loads it inside instance.
"""
if not data.startswith(b'v'):
return super().unmarshal(data)
# logger.debug('Data: {0}'.format(data))
vals = data.decode('utf8').split('\t')
if vals[0] == 'v1':
(
self._name,
self._reason,
self._destroyAfter,
self._templateId,
destroy_after,
self._template_id,
self._state,
self._task,
) = vals[1:]
else:
raise ValueError('Invalid data format')
self._destroy_after = destroy_after == 't'
self.mark_for_upgrade() # Force upgrade asap
def publish(self) -> str:
"""
@@ -103,7 +99,7 @@ class XenPublication(Publication):
self.servicepool_name(), str(datetime.now()).split('.')[0]
)
self._reason = '' # No error, no reason for it
self._destroyAfter = 'f'
self._destroy_after = False
self._state = 'ok'
try:
@@ -129,11 +125,12 @@ class XenPublication(Publication):
state, result = self.service().check_task_finished(self._task)
if state: # Finished
self._state = 'finished'
self._templateId = result
if self._destroyAfter == 't':
self._template_id = result
if self._destroy_after:
self._destroy_after = False
return self.destroy()
self.service().convertToTemplate(self._templateId)
self.service().convertToTemplate(self._template_id)
return State.FINISHED
except Exception as e:
self._state = 'error'
@@ -148,11 +145,11 @@ class XenPublication(Publication):
def destroy(self) -> str:
# We do not do anything else to destroy this instance of publication
if self._state == 'ok':
self._destroyAfter = 't'
self._destroy_after = True
return State.RUNNING
try:
self.service().removeTemplate(self._templateId)
self.service().removeTemplate(self._template_id)
except Exception as e:
self._state = 'error'
self._reason = str(e)
@@ -171,4 +168,4 @@ class XenPublication(Publication):
"""
Returns the template id associated with the publication
"""
return self._templateId
return self._template_id

View File

@@ -113,7 +113,7 @@ class SPICETransport(BaseSpiceTransport):
r.ssl_connection = self.allow_usb_redirection_new_plugs.as_bool()
# if sso: # If SSO requested, and when supported by platform
# userServiceInstance.desktopLogin(user, password, '')
# userServiceInstance.desktop_login(user, password, '')
sp = {
'as_file': r.as_file,

View File

@@ -156,7 +156,7 @@ class TSPICETransport(BaseSpiceTransport):
r.ssl_connection = self.ssl_connection.as_bool()
# if sso: # If SSO requested, and when supported by platform
# userServiceInstance.desktopLogin(user, password, '')
# userServiceInstance.desktop_login(user, password, '')
sp = {
'as_file': r.as_file,

View File

@@ -104,7 +104,7 @@ class RegexSerializationTest(UDSTestCase):
# Ensure remarshalled flag is set
self.assertTrue(instance.needs_upgrade())
instance.flag_for_upgrade(False) # reset flag
instance.mark_for_upgrade(False) # reset flag
# Ensure fields has been marshalled using new format
self.assertFalse(marshaled_data.startswith(b'v'))

View File

@@ -110,7 +110,7 @@ class SimpleLdapSerializationTest(UDSTestCase):
# Ensure remarshalled flag is set
self.assertTrue(instance.needs_upgrade())
instance.flag_for_upgrade(False) # reset flag
instance.mark_for_upgrade(False) # reset flag
# Ensure fields has been marshalled using new format
self.assertFalse(marshaled_data.startswith(b'v'))

View File

@@ -91,7 +91,7 @@ class LinuxOsManagerTest(UDSTestCase):
# Ensure remarshalled flag is set
self.assertTrue(instance.needs_upgrade())
instance.flag_for_upgrade(False) # reset flag
instance.mark_for_upgrade(False) # reset flag
# Ensure fields has been marshalled using new format
self.assertFalse(marshaled_data.startswith(b'v'))

View File

@@ -84,7 +84,7 @@ class LinuxOsManagerSerialTest(UDSTestCase):
# Ensure remarshalled flag is set
self.assertTrue(instance.needs_upgrade())
instance.flag_for_upgrade(False) # reset flag
instance.mark_for_upgrade(False) # reset flag
# Ensure fields has been marshalled using new format
self.assertFalse(marshaled_data.startswith(b'v'))

View File

@@ -117,7 +117,7 @@ class WindowsOsManagerSerialTest(UDSTestCase):
# Ensure remarshalled flag is set
self.assertTrue(instance.needs_upgrade())
instance.flag_for_upgrade(False) # reset flag
instance.mark_for_upgrade(False) # reset flag
# Ensure fields has been marshalled using new format
self.assertFalse(marshaled_data.startswith(b'v'))

View File

@@ -91,7 +91,7 @@ class WindowsOsManagerSerialTest(UDSTestCase):
# Ensure remarshalled flag is set
self.assertTrue(instance.needs_upgrade())
instance.flag_for_upgrade(False) # reset flag
instance.mark_for_upgrade(False) # reset flag
# Ensure fields has been marshalled using new format
self.assertFalse(marshaled_data.startswith(b'v'))

View File

@@ -84,7 +84,7 @@ class WindowsOsManagerSerialTest(UDSTestCase):
# Ensure remarshalled flag is set
self.assertTrue(instance.needs_upgrade())
instance.flag_for_upgrade(False) # reset flag
instance.mark_for_upgrade(False) # reset flag
# Ensure fields has been marshalled using new format
self.assertFalse(marshaled_data.startswith(b'v'))

View File

View File

@@ -0,0 +1,175 @@
# -*- coding: utf-8 -*-
#
# Copyright (c) 2024 Virtual Cable S.L.U.
# All rights reserved.
#
# Redistribution and use in source and binary forms, with or without modification,
# are permitted provided that the following conditions are met:
#
# * Redistributions of source code must retain the above copyright notice,
# this list of conditions and the following disclaimer.
# * Redistributions in binary form must reproduce the above copyright notice,
# this list of conditions and the following disclaimer in the documentation
# and/or other materials provided with the distribution.
# * Neither the name of Virtual Cable S.L.U. nor the names of its contributors
# may be used to endorse or promote products derived from this software
# without specific prior written permission.
#
# THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS IS"
# AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE
# IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE ARE
# DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT HOLDER OR CONTRIBUTORS BE LIABLE
# FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR CONSEQUENTIAL
# DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR
# SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER
# CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY,
# OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE
# OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
"""
Author: Adolfo Gómez, dkmaster at dkmon dot com
"""
import pickle
import typing
# We use storage, so we need transactional tests
from tests.utils.test import UDSTransactionTestCase
from uds.core.util import autoserializable
from uds.core.environment import Environment
from uds.services import Proxmox
from uds.services.Proxmox.deployment import Operation as Operation, ProxmoxDeployment as Deployment
# if not data.startswith(b'v'):
# return super().unmarshal(data)
# vals = data.split(b'\1')
# if vals[0] == b'v1':
# self._name = vals[1].decode('utf8')
# self._ip = vals[2].decode('utf8')
# self._mac = vals[3].decode('utf8')
# self._task = vals[4].decode('utf8')
# self._vmid = vals[5].decode('utf8')
# self._reason = vals[6].decode('utf8')
# self._queue = [Operation.from_int(i) for i in pickle.loads(vals[7])] # nosec: controled data
# self.flag_for_upgrade() # Flag so manager can save it again with new format
EXPECTED_FIELDS: typing.Final[set[str]] = {
'_name',
'_ip',
'_mac',
'_task',
'_vmid',
'_reason',
'_queue',
}
TEST_QUEUE: typing.Final[list[Operation]] = [
Operation.CREATE,
Operation.REMOVE,
Operation.RETRY,
]
SERIALIZED_DEPLOYMENT_DATA: typing.Final[typing.Mapping[str, bytes]] = {
'v1': b'v1\x01name\x01ip\x01mac\x01task\x01vmid\x01reason\x01' + pickle.dumps(TEST_QUEUE),
}
LAST_VERSION: typing.Final[str] = sorted(SERIALIZED_DEPLOYMENT_DATA.keys(), reverse=True)[0]
class OpenStackDeploymentSerializationTest(UDSTransactionTestCase):
def check(self, version: str, instance: Deployment) -> None:
self.assertEqual(instance._name, 'name')
self.assertEqual(instance._ip, 'ip')
self.assertEqual(instance._mac, 'mac')
self.assertEqual(instance._task, 'task')
self.assertEqual(instance._vmid, 'vmid')
self.assertEqual(instance._reason, 'reason')
self.assertEqual(instance._queue, TEST_QUEUE)
def test_marshaling(self) -> None:
# queue is kept on "storage", so we need always same environment
environment = Environment.testing_environment()
def _create_instance(unmarshal_data: 'bytes|None' = None) -> Deployment:
instance = Deployment(environment=environment, service=None)
if unmarshal_data:
instance.unmarshal(unmarshal_data)
return instance
for v in range(1, len(SERIALIZED_DEPLOYMENT_DATA) + 1):
version = f'v{v}'
instance = _create_instance(SERIALIZED_DEPLOYMENT_DATA[version])
self.check(version, instance)
# Ensure remarshalled flag is set
self.assertTrue(instance.needs_upgrade())
instance.mark_for_upgrade(False) # reset flag
marshaled_data = instance.marshal()
self.assertFalse(
marshaled_data.startswith(b'\v')
) # Ensure fields has been marshalled using new format
instance = _create_instance(marshaled_data)
self.assertFalse(
instance.needs_upgrade()
) # Reunmarshall again and check that remarshalled flag is not set
self.check(version, instance)
def test_marshaling_queue(self) -> None:
# queue is kept on "storage", so we need always same environment
environment = Environment.testing_environment()
# Store queue
environment.storage.put_pickle('queue', TEST_QUEUE)
def _create_instance(unmarshal_data: 'bytes|None' = None) -> Deployment:
instance = Deployment(environment=environment, service=None)
if unmarshal_data:
instance.unmarshal(unmarshal_data)
return instance
instance = _create_instance(SERIALIZED_DEPLOYMENT_DATA[LAST_VERSION])
self.assertEqual(instance._queue, TEST_QUEUE)
instance._queue = [
Operation.CREATE,
Operation.FINISH,
]
marshaled_data = instance.marshal()
instance = _create_instance(marshaled_data)
self.assertEqual(
instance._queue,
[Operation.CREATE, Operation.FINISH],
)
# Append something remarshall and check
instance._queue.insert(0, Operation.RETRY)
marshaled_data = instance.marshal()
instance = _create_instance(marshaled_data)
self.assertEqual(
instance._queue,
[
Operation.RETRY,
Operation.CREATE,
Operation.FINISH,
],
)
# Remove something remarshall and check
instance._queue.pop(0)
marshaled_data = instance.marshal()
instance = _create_instance(marshaled_data)
self.assertEqual(
instance._queue,
[Operation.CREATE, Operation.FINISH],
)
def test_autoserialization_fields(self) -> None:
# This test is designed to ensure that all fields are autoserializable
# If some field is added or removed, this tests will warn us about it to fix the rest of the related tests
with Environment.temporary_environment() as env:
instance = Deployment(environment=env, service=None)
self.assertSetEqual(set(f[0] for f in instance._autoserializable_fields()), EXPECTED_FIELDS)

View File

@@ -0,0 +1,83 @@
# -*- coding: utf-8 -*-
#
# Copyright (c) 2022 Virtual Cable S.L.
# All rights reserved.
#
# Redistribution and use in source and binary forms, with or without modification,
# are permitted provided that the following conditions are met:
#
# * Redistributions of source code must retain the above copyright notice,
# this list of conditions and the following disclaimer.
# * Redistributions in binary form must reproduce the above copyright notice,
# this list of conditions and the following disclaimer in the documentation
# and/or other materials provided with the distribution.
# * Neither the name of Virtual Cable S.L.U. nor the names of its contributors
# may be used to endorse or promote products derived from this software
# without specific prior written permission.
#
# THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS IS"
# AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE
# IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE ARE
# DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT HOLDER OR CONTRIBUTORS BE LIABLE
# FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR CONSEQUENTIAL
# DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR
# SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER
# CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY,
# OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE
# OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
"""
Author: Adolfo Gómez, dkmaster at dkmon dot com
"""
import typing
# We use commit/rollback
from tests.utils.test import UDSTestCase
from uds.core.managers import crypto
from uds.core.environment import Environment
from uds.services.Proxmox.provider import ProxmoxProvider
PROVIDER_SERIALIZE_DATA: typing.Final[str] = (
'R1VJWgF2Mf5E0Eb/AlXtUzvdsF+YFTi08PsxvNhRm+Hu3Waqa0Gw0WeReoM5XTnmvopa9+Ex99oRhzW7xr6THkQ7vMZvwKlcI77l'
'+Zz3FKXnbZnXZkqY0GIqvUzHjQra2Xx9koxkvtAXl64aldXSCjO4xMqCzsCsxgn2fPYnD76TgSccUftTLr5UpaKxXrOg5qr836Si'
'Y83F6Ko20viicmczi3NmMTR+ii+lmSCUrnRJc/IcxTrfmturJu0X0TipMX5C3xqMyIa1LtsPyHO3yTkYW9bGqP/B1DbDOHy27gu6'
'DlJwQpi2SRSYEO9pOCTosuVqOpP7hDwCFYn5D1jcEDKZcOmOMuN9qDD423eXUUoCRx2YHmSS0mt03nWxZScV7Ny4U9gmv/x2jsK3'
'4YL88CPDjh/eMGc7V+LhCSqpEOFmvEz6DVAf'
)
PROVIDER_FIELDS_DATA: typing.Final[dict[str, typing.Any]] = {
'host': 'proxmox_host',
'port': 8666,
'username': 'proxmox_username',
'password': 'proxmox_passwd',
'concurrent_creation_limit': 31,
'concurrent_removal_limit': 32,
'timeout': 9999,
'start_vmid': 99999,
'macs_range': '52:54:01:02:03:04-52:54:05:06:07:08',
}
class TestProxmoxProviderSerialization(UDSTestCase):
_oldUDSK: bytes
def setUp(self) -> None:
# Override UDSK
self._oldUDSK = crypto.UDSK
# Set same key as used to encrypt serialized data
crypto.UDSK = b'f#s35!e38xv%e-+i' # type: ignore # UDSK is final, but this is a test
return super().setUp()
def tearDown(self) -> None:
crypto.UDSK = self._oldUDSK # type: ignore # UDSK is final, but this is a test
return super().tearDown()
def test_provider_serialization(self) -> None:
provider = ProxmoxProvider(environment=Environment.testing_environment())
provider.deserialize(PROVIDER_SERIALIZE_DATA)
# Ensure values are ok
for field in PROVIDER_FIELDS_DATA:
self.assertEqual(getattr(provider, field).value, PROVIDER_FIELDS_DATA[field])

View File

@@ -0,0 +1,113 @@
# -*- coding: utf-8 -*-
#
# Copyright (c) 2024 Virtual Cable S.L.U.
# All rights reserved.
#
# Redistribution and use in source and binary forms, with or without modification,
# are permitted provided that the following conditions are met:
#
# * Redistributions of source code must retain the above copyright notice,
# this list of conditions and the following disclaimer.
# * Redistributions in binary form must reproduce the above copyright notice,
# this list of conditions and the following disclaimer in the documentation
# and/or other materials provided with the distribution.
# * Neither the name of Virtual Cable S.L.U. nor the names of its contributors
# may be used to endorse or promote products derived from this software
# without specific prior written permission.
#
# THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS IS"
# AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE
# IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE ARE
# DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT HOLDER OR CONTRIBUTORS BE LIABLE
# FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR CONSEQUENTIAL
# DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR
# SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER
# CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY,
# OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE
# OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
"""
Author: Adolfo Gómez, dkmaster at dkmon dot com
"""
import pickle
import typing
from tests.utils.test import UDSTestCase
from uds.core.environment import Environment
from uds.core.util import autoserializable
from uds.services.Proxmox import publication
# if not data.startswith(b'v'):
# return super().unmarshal(data)
# logger.debug('Data: %s', data)
# vals = data.decode('utf8').split('\t')
# if vals[0] == 'v1':
# (
# self._name,
# self._vm,
# self._task,
# self._state,
# self._operation,
# destroy_after,
# self._reason,
# ) = vals[1:]
# else:
# raise ValueError('Invalid data format')
# self._destroy_after = destroy_after != ''
EXPECTED_FIELDS: typing.Final[set[str]] = {
'_name',
'_vm',
'_task',
'_state',
'_operation',
'_destroy_after',
'_reason',
}
SERIALIZED_PUBLICATION_DATA: typing.Final[bytes] = b'v1\tname\tvm\ttask\tstate\toperation\ty\treason'
class OpenStackPublicationSerializationTest(UDSTestCase):
def check(self, instance: publication.ProxmoxPublication) -> None:
self.assertEqual(instance._name, 'name')
self.assertEqual(instance._vm, 'vm')
self.assertEqual(instance._task, 'task')
self.assertEqual(instance._state, 'state')
self.assertEqual(instance._operation, 'operation')
self.assertTrue(instance._destroy_after)
self.assertEqual(instance._reason, 'reason')
def test_marshaling(self) -> None:
environment = Environment.testing_environment()
instance = publication.ProxmoxPublication(environment=environment, service=None)
instance.unmarshal(SERIALIZED_PUBLICATION_DATA)
self.check(instance)
# Ensure remarshalled flag is set
self.assertTrue(instance.needs_upgrade())
instance.mark_for_upgrade(False) # reset flag
marshaled_data = instance.marshal()
# Ensure fields has been marshalled using new format
self.assertFalse(marshaled_data.startswith(b'\1'))
# Reunmarshall again and check that remarshalled flag is not set
instance = publication.ProxmoxPublication(environment=environment, service=None)
instance.unmarshal(marshaled_data)
self.assertFalse(instance.needs_upgrade())
# Check that data is correct
self.check(instance)
def test_autoserialization_fields(self) -> None:
# This test is designed to ensure that all fields are autoserializable
# If some field is added or removed, this tests will warn us about it to fix the rest of the related tests
with Environment.temporary_environment() as env:
instance = publication.ProxmoxPublication(environment=env, service=None)
self.assertSetEqual(set(f[0] for f in instance._autoserializable_fields()), EXPECTED_FIELDS)

View File

@@ -57,6 +57,15 @@ from uds.services.OpenGnsys import deployment as deployment
# ) # nosec: not insecure, we are loading our own data
# self.flag_for_upgrade() # Flag so manager can save it again with new format
EXPECTED_FIELDS: typing.Final[set[str]] = {
'_name',
'_ip',
'_mac',
'_machine_id',
'_reason',
'_stamp',
'_queue',
}
TEST_QUEUE: typing.Final[list[deployment.Operation]] = [
deployment.Operation.CREATE,
@@ -70,7 +79,7 @@ SERIALIZED_DEPLOYMENT_DATA: typing.Final[typing.Mapping[str, bytes]] = {
LAST_VERSION: typing.Final[str] = sorted(SERIALIZED_DEPLOYMENT_DATA.keys(), reverse=True)[0]
class OpenGnsysDeploymentSerializationTest(UDSTransactionTestCase):
def check(self, instance: deployment.OGDeployment) -> None:
def check(self, instance: deployment.OpenGnsysUserService) -> None:
self.assertEqual(instance._name, 'name')
self.assertEqual(instance._ip, 'ip')
self.assertEqual(instance._mac, 'mac')
@@ -83,8 +92,8 @@ class OpenGnsysDeploymentSerializationTest(UDSTransactionTestCase):
# queue is kept on "storage", so we need always same environment
environment = Environment.testing_environment()
def _create_instance(unmarshal_data: 'bytes|None' = None) -> deployment.OGDeployment:
instance = deployment.OGDeployment(environment=environment, service=None)
def _create_instance(unmarshal_data: 'bytes|None' = None) -> deployment.OpenGnsysUserService:
instance = deployment.OpenGnsysUserService(environment=environment, service=None)
if unmarshal_data:
instance.unmarshal(unmarshal_data)
return instance
@@ -95,7 +104,7 @@ class OpenGnsysDeploymentSerializationTest(UDSTransactionTestCase):
self.check(instance)
# Ensure remarshalled flag is set
self.assertTrue(instance.needs_upgrade())
instance.flag_for_upgrade(False) # reset flag
instance.mark_for_upgrade(False) # reset flag
marshaled_data = instance.marshal()
self.assertFalse(marshaled_data.startswith(b'\v')) # Ensure fields has been marshalled using new format
@@ -110,8 +119,8 @@ class OpenGnsysDeploymentSerializationTest(UDSTransactionTestCase):
# Store queue
environment.storage.put_pickle('queue', TEST_QUEUE)
def _create_instance(unmarshal_data: 'bytes|None' = None) -> deployment.OGDeployment:
instance = deployment.OGDeployment(environment=environment, service=None)
def _create_instance(unmarshal_data: 'bytes|None' = None) -> deployment.OpenGnsysUserService:
instance = deployment.OpenGnsysUserService(environment=environment, service=None)
if unmarshal_data:
instance.unmarshal(unmarshal_data)
return instance
@@ -150,3 +159,12 @@ class OpenGnsysDeploymentSerializationTest(UDSTransactionTestCase):
instance._queue,
[deployment.Operation.CREATE, deployment.Operation.FINISH],
)
def test_autoserialization_fields(self) -> None:
# This test is designed to ensure that all fields are autoserializable
# If some field is added or removed, this tests will warn us about it to fix the rest of the related tests
with Environment.temporary_environment() as env:
instance = deployment.OpenGnsysUserService(environment=env, service=None)
instance_fields = set(f[0] for f in instance._autoserializable_fields())
self.assertSetEqual(instance_fields, EXPECTED_FIELDS)

View File

@@ -41,33 +41,43 @@ from uds.core.environment import Environment
from uds.services.OpenGnsys import publication
EXPECTED_FIELDS: typing.Final[set[str]] = set()
SERIALIZED_PUBLICATION_DATA: typing.Final[bytes] = b''
class OpenGnsysPublicationSerializationTest(UDSTestCase):
def check(self, instance: publication.OGPublication) -> None:
def check(self, instance: publication.OpenGnsysPublication) -> None:
# No data currently, all is fine
pass
def test_marshaling(self) -> None:
environment = Environment.testing_environment()
instance = publication.OGPublication(environment=environment, service=None)
instance = publication.OpenGnsysPublication(environment=environment, service=None)
#instance.unmarshal(SERIALIZED_PUBLICATION_DATA)
self.check(instance)
# Ensure remarshalled flag is set
#self.assertTrue(instance.needs_upgrade())
instance.flag_for_upgrade(False) # reset flag
instance.mark_for_upgrade(False) # reset flag
marshaled_data = instance.marshal()
# Ensure fields has been marshalled using new format
self.assertFalse(marshaled_data.startswith(b'\1'))
# Reunmarshall again and check that remarshalled flag is not set
instance = publication.OGPublication(environment=environment, service=None)
instance = publication.OpenGnsysPublication(environment=environment, service=None)
#instance.unmarshal(marshaled_data)
#self.assertFalse(instance.needs_upgrade())
# Check that data is correct
self.check(instance)
def test_autoserialization_fields(self) -> None:
# This test is designed to ensure that all fields are autoserializable
# If some field is added or removed, this tests will warn us about it to fix the rest of the related tests
with Environment.temporary_environment() as env:
instance = publication.OpenGnsysPublication(environment=env, service=None)
instance_fields = set(f[0] for f in instance._autoserializable_fields())
self.assertSetEqual(instance_fields, EXPECTED_FIELDS)

View File

@@ -54,6 +54,14 @@ from uds.services.OpenNebula import deployment as deployment
# self._queue = [Operation.from_int(i) for i in pickle.loads(vals[6])] # nosec
# self.flag_for_upgrade() # Flag so manager can save it again with new format
EXPECTED_FIELDS: typing.Final[set[str]] = {
'_name',
'_ip',
'_mac',
'_vmid',
'_reason',
'_queue',
}
TEST_QUEUE: typing.Final[list[deployment.Operation]] = [
deployment.Operation.CREATE,
@@ -93,7 +101,7 @@ class OpenNebulaDeploymentSerializationTest(UDSTransactionTestCase):
self.check(instance)
# Ensure remarshalled flag is set
self.assertTrue(instance.needs_upgrade())
instance.flag_for_upgrade(False) # reset flag
instance.mark_for_upgrade(False) # reset flag
marshaled_data = instance.marshal()
self.assertFalse(
@@ -152,3 +160,11 @@ class OpenNebulaDeploymentSerializationTest(UDSTransactionTestCase):
instance._queue,
[deployment.Operation.CREATE, deployment.Operation.FINISH],
)
def test_autoserialization_fields(self) -> None:
# This test is designed to ensure that all fields are autoserializable
# If some field is added or removed, this tests will warn us about it to fix the rest of the related tests
with Environment.temporary_environment() as env:
instance = deployment.OpenNebulaLiveDeployment(environment=env, service=None)
self.assertSetEqual(set(f[0] for f in instance._autoserializable_fields()), EXPECTED_FIELDS)

View File

@@ -50,7 +50,13 @@ from uds.services.OpenNebula import publication
# self._name, self._reason, self._template_id, self._state = vals[1:]
# self.flag_for_upgrade() # Flag so manager can save it again with new format
EXPECTED_FIELDS: typing.Final[set[str]] = {
'_name',
'_reason',
'_template_id',
'_state',
'_destroy_after', # Newly added field
}
SERIALIZED_PUBLICATION_DATA: typing.Final[bytes] = b'v1\tname\treason\ttemplate_id\tstate'
@@ -61,6 +67,7 @@ class OpenGnsysPublicationSerializationTest(UDSTestCase):
self.assertEqual(instance._reason, 'reason')
self.assertEqual(instance._template_id, 'template_id')
self.assertEqual(instance._state, 'state')
self.assertEqual(instance._destroy_after, False)
def test_marshaling(self) -> None:
environment = Environment.testing_environment()
@@ -70,7 +77,7 @@ class OpenGnsysPublicationSerializationTest(UDSTestCase):
self.check(instance)
# Ensure remarshalled flag is set
self.assertTrue(instance.needs_upgrade())
instance.flag_for_upgrade(False) # reset flag
instance.mark_for_upgrade(False) # reset flag
marshaled_data = instance.marshal()
@@ -83,3 +90,11 @@ class OpenGnsysPublicationSerializationTest(UDSTestCase):
# Check that data is correct
self.check(instance)
def test_autoserialization_fields(self) -> None:
# This test is designed to ensure that all fields are autoserializable
# If some field is added or removed, this tests will warn us about it to fix the rest of the related tests
with Environment.temporary_environment() as env:
instance = publication.OpenNebulaLivePublication(environment=env, service=None)
self.assertSetEqual(set(f[0] for f in instance._autoserializable_fields()), EXPECTED_FIELDS)

View File

@@ -52,6 +52,14 @@ from uds.services.OpenStack import deployment as deployment
# self._vmid = vals[4].decode('utf8')
# self._reason = vals[5].decode('utf8')
# self._queue = pickle.loads(vals[6]) # nosec: not insecure, we are loading our own data
EXPECTED_FIELDS: typing.Final[set[str]] = {
'_name',
'_ip',
'_mac',
'_vmid',
'_reason',
'_queue',
}
TEST_QUEUE: typing.Final[list[deployment.Operation]] = [
deployment.Operation.CREATE,
@@ -67,7 +75,7 @@ LAST_VERSION: typing.Final[str] = sorted(SERIALIZED_DEPLOYMENT_DATA.keys(), reve
class OpenStackDeploymentSerializationTest(UDSTransactionTestCase):
def check(self, instance: deployment.OpenStackLiveDeployment) -> None:
def check(self, version: str, instance: deployment.OpenStackLiveDeployment) -> None:
self.assertEqual(instance._name, 'name')
self.assertEqual(instance._ip, 'ip')
self.assertEqual(instance._mac, 'mac')
@@ -88,10 +96,10 @@ class OpenStackDeploymentSerializationTest(UDSTransactionTestCase):
for v in range(1, len(SERIALIZED_DEPLOYMENT_DATA) + 1):
version = f'v{v}'
instance = _create_instance(SERIALIZED_DEPLOYMENT_DATA[version])
self.check(instance)
self.check(version, instance)
# Ensure remarshalled flag is set
self.assertTrue(instance.needs_upgrade())
instance.flag_for_upgrade(False) # reset flag
instance.mark_for_upgrade(False) # reset flag
marshaled_data = instance.marshal()
self.assertFalse(
@@ -102,7 +110,7 @@ class OpenStackDeploymentSerializationTest(UDSTransactionTestCase):
self.assertFalse(
instance.needs_upgrade()
) # Reunmarshall again and check that remarshalled flag is not set
self.check(instance)
self.check(version, instance)
def test_marshaling_queue(self) -> None:
# queue is kept on "storage", so we need always same environment
@@ -150,3 +158,11 @@ class OpenStackDeploymentSerializationTest(UDSTransactionTestCase):
instance._queue,
[deployment.Operation.CREATE, deployment.Operation.FINISH],
)
def test_autoserialization_fields(self) -> None:
# This test is designed to ensure that all fields are autoserializable
# If some field is added or removed, this tests will warn us about it to fix the rest of the related tests
with Environment.temporary_environment() as env:
instance = deployment.OpenStackLiveDeployment(environment=env, service=None)
self.assertSetEqual(set(f[0] for f in instance._autoserializable_fields()), EXPECTED_FIELDS)

View File

@@ -33,14 +33,15 @@ Author: Adolfo Gómez, dkmaster at dkmon dot com
import pickle
import typing
from tests.utils.test import UDSTestCase
from uds.core.environment import Environment
from uds.core.util import autoserializable
from uds.services.OpenStack import publication
# We use commit/rollback
from tests.utils.test import UDSTestCase
from uds.core.util import autoserializable
from uds.core.environment import Environment
from uds.services.OpenStack import publication
# if not data.startswith(b'v'):
# return super().unmarshal(data)
@@ -52,6 +53,13 @@ from uds.services.OpenStack import publication
# raise Exception('Invalid data')
# self._destroy_after = destroy_after == 'y'
EXPECTED_FIELDS: typing.Final[set[str]] = {
'_name',
'_reason',
'_template_id',
'_state',
'_destroy_after',
}
SERIALIZED_PUBLICATION_DATA: typing.Final[bytes] = b'v1\tname\treason\ttemplate_id\tstate\ty'
@@ -73,7 +81,7 @@ class OpenStackPublicationSerializationTest(UDSTestCase):
self.check(instance)
# Ensure remarshalled flag is set
self.assertTrue(instance.needs_upgrade())
instance.flag_for_upgrade(False) # reset flag
instance.mark_for_upgrade(False) # reset flag
marshaled_data = instance.marshal()
@@ -86,3 +94,10 @@ class OpenStackPublicationSerializationTest(UDSTestCase):
# Check that data is correct
self.check(instance)
def test_autoserialization_fields(self) -> None:
# This test is designed to ensure that all fields are autoserializable
# If some field is added or removed, this tests will warn us about it to fix the rest of the related tests
with Environment.temporary_environment() as env:
instance = publication.OpenStackLivePublication(environment=env, service=None)
self.assertSetEqual(set(f[0] for f in instance._autoserializable_fields()), EXPECTED_FIELDS)

View File

View File

@@ -0,0 +1,174 @@
# -*- coding: utf-8 -*-
#
# Copyright (c) 2024 Virtual Cable S.L.U.
# All rights reserved.
#
# Redistribution and use in source and binary forms, with or without modification,
# are permitted provided that the following conditions are met:
#
# * Redistributions of source code must retain the above copyright notice,
# this list of conditions and the following disclaimer.
# * Redistributions in binary form must reproduce the above copyright notice,
# this list of conditions and the following disclaimer in the documentation
# and/or other materials provided with the distribution.
# * Neither the name of Virtual Cable S.L.U. nor the names of its contributors
# may be used to endorse or promote products derived from this software
# without specific prior written permission.
#
# THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS IS"
# AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE
# IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE ARE
# DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT HOLDER OR CONTRIBUTORS BE LIABLE
# FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR CONSEQUENTIAL
# DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR
# SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER
# CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY,
# OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE
# OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
"""
Author: Adolfo Gómez, dkmaster at dkmon dot com
"""
import pickle
import typing
# We use storage, so we need transactional tests
from tests.utils.test import UDSTransactionTestCase
from uds.core.util import autoserializable
from uds.core.environment import Environment
from uds.services import Proxmox
from uds.services.OVirt.deployment import Operation as Operation, OVirtLinkedDeployment as Deployment
# if not data.startswith(b'v'):
# return super().unmarshal(data)
# vals = data.split(b'\1')
# if vals[0] == b'v1':
# self._name = vals[1].decode('utf8')
# self._ip = vals[2].decode('utf8')
# self._mac = vals[3].decode('utf8')
# self._vmid = vals[4].decode('utf8')
# self._reason = vals[5].decode('utf8')
# self._queue = [
# Operation.from_int(i) for i in pickle.loads(vals[6])
# ] # nosec: not insecure, we are loading our own data
# self.flag_for_upgrade() # Flag so manager can save it again with new format
EXPECTED_FIELDS: typing.Final[set[str]] = {
'_name',
'_ip',
'_mac',
'_vmid',
'_reason',
'_queue',
}
TEST_QUEUE: typing.Final[list[Operation]] = [
Operation.CREATE,
Operation.REMOVE,
Operation.RETRY,
]
SERIALIZED_DEPLOYMENT_DATA: typing.Final[typing.Mapping[str, bytes]] = {
'v1': b'v1\x01name\x01ip\x01mac\x01vmid\x01reason\x01' + pickle.dumps(TEST_QUEUE),
}
LAST_VERSION: typing.Final[str] = sorted(SERIALIZED_DEPLOYMENT_DATA.keys(), reverse=True)[0]
class OvirtDeploymentSerializationTest(UDSTransactionTestCase):
def check(self, version: str, instance: Deployment) -> None:
self.assertEqual(instance._name, 'name')
self.assertEqual(instance._ip, 'ip')
self.assertEqual(instance._mac, 'mac')
self.assertEqual(instance._vmid, 'vmid')
self.assertEqual(instance._reason, 'reason')
self.assertEqual(instance._queue, TEST_QUEUE)
def test_marshaling(self) -> None:
# queue is kept on "storage", so we need always same environment
environment = Environment.testing_environment()
def _create_instance(unmarshal_data: 'bytes|None' = None) -> Deployment:
instance = Deployment(environment=environment, service=None)
if unmarshal_data:
instance.unmarshal(unmarshal_data)
return instance
for v in range(1, len(SERIALIZED_DEPLOYMENT_DATA) + 1):
version = f'v{v}'
instance = _create_instance(SERIALIZED_DEPLOYMENT_DATA[version])
self.check(version, instance)
# Ensure remarshalled flag is set
self.assertTrue(instance.needs_upgrade())
instance.mark_for_upgrade(False) # reset flag
marshaled_data = instance.marshal()
self.assertFalse(
marshaled_data.startswith(b'\v')
) # Ensure fields has been marshalled using new format
instance = _create_instance(marshaled_data)
self.assertFalse(
instance.needs_upgrade()
) # Reunmarshall again and check that remarshalled flag is not set
self.check(version, instance)
def test_marshaling_queue(self) -> None:
# queue is kept on "storage", so we need always same environment
environment = Environment.testing_environment()
# Store queue
environment.storage.put_pickle('queue', TEST_QUEUE)
def _create_instance(unmarshal_data: 'bytes|None' = None) -> Deployment:
instance = Deployment(environment=environment, service=None)
if unmarshal_data:
instance.unmarshal(unmarshal_data)
return instance
instance = _create_instance(SERIALIZED_DEPLOYMENT_DATA[LAST_VERSION])
self.assertEqual(instance._queue, TEST_QUEUE)
instance._queue = [
Operation.CREATE,
Operation.FINISH,
]
marshaled_data = instance.marshal()
instance = _create_instance(marshaled_data)
self.assertEqual(
instance._queue,
[Operation.CREATE, Operation.FINISH],
)
# Append something remarshall and check
instance._queue.insert(0, Operation.RETRY)
marshaled_data = instance.marshal()
instance = _create_instance(marshaled_data)
self.assertEqual(
instance._queue,
[
Operation.RETRY,
Operation.CREATE,
Operation.FINISH,
],
)
# Remove something remarshall and check
instance._queue.pop(0)
marshaled_data = instance.marshal()
instance = _create_instance(marshaled_data)
self.assertEqual(
instance._queue,
[Operation.CREATE, Operation.FINISH],
)
def test_autoserialization_fields(self) -> None:
# This test is designed to ensure that all fields are autoserializable
# If some field is added or removed, this tests will warn us about it to fix the rest of the related tests
with Environment.temporary_environment() as env:
instance = Deployment(environment=env, service=None)
self.assertSetEqual(set(f[0] for f in instance._autoserializable_fields()), EXPECTED_FIELDS)

View File

@@ -0,0 +1,110 @@
# -*- coding: utf-8 -*-
#
# Copyright (c) 2024 Virtual Cable S.L.U.
# All rights reserved.
#
# Redistribution and use in source and binary forms, with or without modification,
# are permitted provided that the following conditions are met:
#
# * Redistributions of source code must retain the above copyright notice,
# this list of conditions and the following disclaimer.
# * Redistributions in binary form must reproduce the above copyright notice,
# this list of conditions and the following disclaimer in the documentation
# and/or other materials provided with the distribution.
# * Neither the name of Virtual Cable S.L.U. nor the names of its contributors
# may be used to endorse or promote products derived from this software
# without specific prior written permission.
#
# THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS IS"
# AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE
# IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE ARE
# DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT HOLDER OR CONTRIBUTORS BE LIABLE
# FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR CONSEQUENTIAL
# DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR
# SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER
# CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY,
# OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE
# OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
"""
Author: Adolfo Gómez, dkmaster at dkmon dot com
"""
import pickle
import typing
from tests.utils.test import UDSTestCase
from uds.core.environment import Environment
from uds.core.util import autoserializable
from uds.services.OVirt.publication import OVirtPublication as Publication
# if not data.startswith(b'v'):
# return super().unmarshal(data)
# logger.debug('Data: %s', data)
# vals = data.decode('utf8').split('\t')
# if vals[0] == 'v1':
# (
# self._name,
# self._reason,
# destroy_after,
# self._template_id,
# self._state,
# ) = vals[1:]
# else:
# raise ValueError('Invalid data format')
# self._destroy_after = destroy_after == 't'
# self.flag_for_upgrade(False) # reset flag
# self._destroy_after = destroy_after != ''
EXPECTED_FIELDS: typing.Final[set[str]] = {
'_name',
'_reason',
'_destroy_after',
'_template_id',
'_state',
}
SERIALIZED_PUBLICATION_DATA: typing.Final[bytes] = b'v1\tname\treason\tt\ttemplate_id\tstate'
class OvirtPublicationSerializationTest(UDSTestCase):
def check(self, instance: Publication) -> None:
self.assertEqual(instance._name, 'name')
self.assertEqual(instance._reason, 'reason')
self.assertEqual(instance._destroy_after, True)
self.assertEqual(instance._template_id, 'template_id')
self.assertEqual(instance._state, 'state')
def test_marshaling(self) -> None:
environment = Environment.testing_environment()
instance = Publication(environment=environment, service=None)
instance.unmarshal(SERIALIZED_PUBLICATION_DATA)
self.check(instance)
# Ensure remarshalled flag is set
self.assertTrue(instance.needs_upgrade())
instance.mark_for_upgrade(False) # reset flag
marshaled_data = instance.marshal()
# Ensure fields has been marshalled using new format
self.assertFalse(marshaled_data.startswith(b'\1'))
# Reunmarshall again and check that remarshalled flag is not set
instance = Publication(environment=environment, service=None)
instance.unmarshal(marshaled_data)
self.assertFalse(instance.needs_upgrade())
# Check that data is correct
self.check(instance)
def test_autoserialization_fields(self) -> None:
# This test is designed to ensure that all fields are autoserializable
# If some field is added or removed, this tests will warn us about it to fix the rest of the related tests
with Environment.temporary_environment() as env:
instance = Publication(environment=env, service=None)
self.assertSetEqual(set(f[0] for f in instance._autoserializable_fields()), EXPECTED_FIELDS)

View File

@@ -69,6 +69,14 @@ from uds.services.PhysicalMachines import provider, service_multi, service_base
# self.userservices_limit = len(self.hosts)
# self.flag_for_upgrade() # Flag for upgrade as soon as possible
EXPECTED_FIELDS: typing.Final[set[str]] = {
'_token',
'_port',
'_skip_time_on_failure',
'_max_session_for_machine',
'_lock_by_external_access',
'_use_random_ip',
}
STORED_IPS: typing.Final[typing.List[str]] = [f'{i};mac{i}~{i}' for i in range(1, 128)]
EDITABLE_STORED_IPS: typing.Final[typing.List[str]] = [i.split('~')[0] for i in STORED_IPS]
@@ -137,7 +145,7 @@ class PhysicalMachinesMultiSerializationTest(UDSTestCase):
# Ensure remarshalled flag is set
self.assertTrue(instance.needs_upgrade())
instance.flag_for_upgrade(False) # reset flag
instance.mark_for_upgrade(False) # reset flag
# Ensure fields has been marshalled using new format
self.assertFalse(marshalled_data.startswith(b'v'))

View File

@@ -0,0 +1,113 @@
# -*- coding: utf-8 -*-
#
# Copyright (c) 2024 Virtual Cable S.L.U.
# All rights reserved.
#
# Redistribution and use in source and binary forms, with or without modification,
# are permitted provided that the following conditions are met:
#
# * Redistributions of source code must retain the above copyright notice,
# this list of conditions and the following disclaimer.
# * Redistributions in binary form must reproduce the above copyright notice,
# this list of conditions and the following disclaimer in the documentation
# and/or other materials provided with the distribution.
# * Neither the name of Virtual Cable S.L.U. nor the names of its contributors
# may be used to endorse or promote products derived from this software
# without specific prior written permission.
#
# THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS IS"
# AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE
# IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE ARE
# DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT HOLDER OR CONTRIBUTORS BE LIABLE
# FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR CONSEQUENTIAL
# DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR
# SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER
# CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY,
# OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE
# OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
"""
Author: Adolfo Gómez, dkmaster at dkmon dot com
"""
import pickle
import typing
from tests.utils.test import UDSTestCase
from uds.core.environment import Environment
from uds.core.util import autoserializable
from uds.services.Proxmox.publication import ProxmoxPublication as Publication
# if not data.startswith(b'v'):
# return super().unmarshal(data)
# logger.debug('Data: %s', data)
# vals = data.decode('utf8').split('\t')
# if vals[0] == 'v1':
# (
# self._name,
# self._vm,
# self._task,
# self._state,
# self._operation,
# destroy_after,
# self._reason,
# ) = vals[1:]
# else:
# raise ValueError('Invalid data format')
# self._destroy_after = destroy_after != ''
EXPECTED_FIELDS: typing.Final[set[str]] = {
'_name',
'_vm',
'_task',
'_state',
'_operation',
'_destroy_after',
'_reason',
}
SERIALIZED_PUBLICATION_DATA: typing.Final[bytes] = b'v1\tname\tvm\ttask\tstate\toperation\ty\treason'
class ProxmoxPublicationSerializationTest(UDSTestCase):
def check(self, instance: Publication) -> None:
self.assertEqual(instance._name, 'name')
self.assertEqual(instance._vm, 'vm')
self.assertEqual(instance._task, 'task')
self.assertEqual(instance._state, 'state')
self.assertEqual(instance._operation, 'operation')
self.assertTrue(instance._destroy_after)
self.assertEqual(instance._reason, 'reason')
def test_marshaling(self) -> None:
environment = Environment.testing_environment()
instance = Publication(environment=environment, service=None)
instance.unmarshal(SERIALIZED_PUBLICATION_DATA)
self.check(instance)
# Ensure remarshalled flag is set
self.assertTrue(instance.needs_upgrade())
instance.mark_for_upgrade(False) # reset flag
marshaled_data = instance.marshal()
# Ensure fields has been marshalled using new format
self.assertFalse(marshaled_data.startswith(b'\1'))
# Reunmarshall again and check that remarshalled flag is not set
instance = Publication(environment=environment, service=None)
instance.unmarshal(marshaled_data)
self.assertFalse(instance.needs_upgrade())
# Check that data is correct
self.check(instance)
def test_autoserialization_fields(self) -> None:
# This test is designed to ensure that all fields are autoserializable
# If some field is added or removed, this tests will warn us about it to fix the rest of the related tests
with Environment.temporary_environment() as env:
instance = Publication(environment=env, service=None)
self.assertSetEqual(set(f[0] for f in instance._autoserializable_fields()), EXPECTED_FIELDS)

View File

@@ -0,0 +1,175 @@
# -*- coding: utf-8 -*-
#
# Copyright (c) 2024 Virtual Cable S.L.U.
# All rights reserved.
#
# Redistribution and use in source and binary forms, with or without modification,
# are permitted provided that the following conditions are met:
#
# * Redistributions of source code must retain the above copyright notice,
# this list of conditions and the following disclaimer.
# * Redistributions in binary form must reproduce the above copyright notice,
# this list of conditions and the following disclaimer in the documentation
# and/or other materials provided with the distribution.
# * Neither the name of Virtual Cable S.L.U. nor the names of its contributors
# may be used to endorse or promote products derived from this software
# without specific prior written permission.
#
# THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS IS"
# AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE
# IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE ARE
# DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT HOLDER OR CONTRIBUTORS BE LIABLE
# FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR CONSEQUENTIAL
# DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR
# SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER
# CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY,
# OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE
# OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
"""
Author: Adolfo Gómez, dkmaster at dkmon dot com
"""
import pickle
import typing
# We use storage, so we need transactional tests
from tests.utils.test import UDSTransactionTestCase
from uds.core.util import autoserializable
from uds.core.environment import Environment
from uds.services import Proxmox
from uds.services.Proxmox.deployment import Operation as Operation, ProxmoxDeployment as Deployment
# if not data.startswith(b'v'):
# return super().unmarshal(data)
# vals = data.split(b'\1')
# if vals[0] == b'v1':
# self._name = vals[1].decode('utf8')
# self._ip = vals[2].decode('utf8')
# self._mac = vals[3].decode('utf8')
# self._task = vals[4].decode('utf8')
# self._vmid = vals[5].decode('utf8')
# self._reason = vals[6].decode('utf8')
# self._queue = [Operation.from_int(i) for i in pickle.loads(vals[7])] # nosec: controled data
# self.flag_for_upgrade() # Flag so manager can save it again with new format
EXPECTED_FIELDS: typing.Final[set[str]] = {
'_name',
'_ip',
'_mac',
'_task',
'_vmid',
'_reason',
'_queue',
}
TEST_QUEUE: typing.Final[list[Operation]] = [
Operation.CREATE,
Operation.REMOVE,
Operation.RETRY,
]
SERIALIZED_DEPLOYMENT_DATA: typing.Final[typing.Mapping[str, bytes]] = {
'v1': b'v1\x01name\x01ip\x01mac\x01task\x01vmid\x01reason\x01' + pickle.dumps(TEST_QUEUE),
}
LAST_VERSION: typing.Final[str] = sorted(SERIALIZED_DEPLOYMENT_DATA.keys(), reverse=True)[0]
class ProxmoxDeploymentSerializationTest(UDSTransactionTestCase):
def check(self, version: str, instance: Deployment) -> None:
self.assertEqual(instance._name, 'name')
self.assertEqual(instance._ip, 'ip')
self.assertEqual(instance._mac, 'mac')
self.assertEqual(instance._task, 'task')
self.assertEqual(instance._vmid, 'vmid')
self.assertEqual(instance._reason, 'reason')
self.assertEqual(instance._queue, TEST_QUEUE)
def test_marshaling(self) -> None:
# queue is kept on "storage", so we need always same environment
environment = Environment.testing_environment()
def _create_instance(unmarshal_data: 'bytes|None' = None) -> Deployment:
instance = Deployment(environment=environment, service=None)
if unmarshal_data:
instance.unmarshal(unmarshal_data)
return instance
for v in range(1, len(SERIALIZED_DEPLOYMENT_DATA) + 1):
version = f'v{v}'
instance = _create_instance(SERIALIZED_DEPLOYMENT_DATA[version])
self.check(version, instance)
# Ensure remarshalled flag is set
self.assertTrue(instance.needs_upgrade())
instance.mark_for_upgrade(False) # reset flag
marshaled_data = instance.marshal()
self.assertFalse(
marshaled_data.startswith(b'\v')
) # Ensure fields has been marshalled using new format
instance = _create_instance(marshaled_data)
self.assertFalse(
instance.needs_upgrade()
) # Reunmarshall again and check that remarshalled flag is not set
self.check(version, instance)
def test_marshaling_queue(self) -> None:
# queue is kept on "storage", so we need always same environment
environment = Environment.testing_environment()
# Store queue
environment.storage.put_pickle('queue', TEST_QUEUE)
def _create_instance(unmarshal_data: 'bytes|None' = None) -> Deployment:
instance = Deployment(environment=environment, service=None)
if unmarshal_data:
instance.unmarshal(unmarshal_data)
return instance
instance = _create_instance(SERIALIZED_DEPLOYMENT_DATA[LAST_VERSION])
self.assertEqual(instance._queue, TEST_QUEUE)
instance._queue = [
Operation.CREATE,
Operation.FINISH,
]
marshaled_data = instance.marshal()
instance = _create_instance(marshaled_data)
self.assertEqual(
instance._queue,
[Operation.CREATE, Operation.FINISH],
)
# Append something remarshall and check
instance._queue.insert(0, Operation.RETRY)
marshaled_data = instance.marshal()
instance = _create_instance(marshaled_data)
self.assertEqual(
instance._queue,
[
Operation.RETRY,
Operation.CREATE,
Operation.FINISH,
],
)
# Remove something remarshall and check
instance._queue.pop(0)
marshaled_data = instance.marshal()
instance = _create_instance(marshaled_data)
self.assertEqual(
instance._queue,
[Operation.CREATE, Operation.FINISH],
)
def test_autoserialization_fields(self) -> None:
# This test is designed to ensure that all fields are autoserializable
# If some field is added or removed, this tests will warn us about it to fix the rest of the related tests
with Environment.temporary_environment() as env:
instance = Deployment(environment=env, service=None)
self.assertSetEqual(set(f[0] for f in instance._autoserializable_fields()), EXPECTED_FIELDS)

View File

@@ -60,7 +60,7 @@ PROVIDER_FIELDS_DATA: typing.Final[dict[str, typing.Any]] = {
}
class TestProxmoxProviderSerialization(UDSTestCase):
class ProxmoxProviderSerializationTest(UDSTestCase):
_oldUDSK: bytes
def setUp(self) -> None:

View File

@@ -0,0 +1,113 @@
# -*- coding: utf-8 -*-
#
# Copyright (c) 2024 Virtual Cable S.L.U.
# All rights reserved.
#
# Redistribution and use in source and binary forms, with or without modification,
# are permitted provided that the following conditions are met:
#
# * Redistributions of source code must retain the above copyright notice,
# this list of conditions and the following disclaimer.
# * Redistributions in binary form must reproduce the above copyright notice,
# this list of conditions and the following disclaimer in the documentation
# and/or other materials provided with the distribution.
# * Neither the name of Virtual Cable S.L.U. nor the names of its contributors
# may be used to endorse or promote products derived from this software
# without specific prior written permission.
#
# THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS IS"
# AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE
# IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE ARE
# DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT HOLDER OR CONTRIBUTORS BE LIABLE
# FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR CONSEQUENTIAL
# DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR
# SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER
# CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY,
# OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE
# OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
"""
Author: Adolfo Gómez, dkmaster at dkmon dot com
"""
import pickle
import typing
from tests.utils.test import UDSTestCase
from uds.core.environment import Environment
from uds.core.util import autoserializable
from uds.services.Proxmox.publication import ProxmoxPublication as Publication
# if not data.startswith(b'v'):
# return super().unmarshal(data)
# logger.debug('Data: %s', data)
# vals = data.decode('utf8').split('\t')
# if vals[0] == 'v1':
# (
# self._name,
# self._vm,
# self._task,
# self._state,
# self._operation,
# destroy_after,
# self._reason,
# ) = vals[1:]
# else:
# raise ValueError('Invalid data format')
# self._destroy_after = destroy_after != ''
EXPECTED_FIELDS: typing.Final[set[str]] = {
'_name',
'_vm',
'_task',
'_state',
'_operation',
'_destroy_after',
'_reason',
}
SERIALIZED_PUBLICATION_DATA: typing.Final[bytes] = b'v1\tname\tvm\ttask\tstate\toperation\ty\treason'
class ProxmoxPublicationSerializationTest(UDSTestCase):
def check(self, instance: Publication) -> None:
self.assertEqual(instance._name, 'name')
self.assertEqual(instance._vm, 'vm')
self.assertEqual(instance._task, 'task')
self.assertEqual(instance._state, 'state')
self.assertEqual(instance._operation, 'operation')
self.assertTrue(instance._destroy_after)
self.assertEqual(instance._reason, 'reason')
def test_marshaling(self) -> None:
environment = Environment.testing_environment()
instance = Publication(environment=environment, service=None)
instance.unmarshal(SERIALIZED_PUBLICATION_DATA)
self.check(instance)
# Ensure remarshalled flag is set
self.assertTrue(instance.needs_upgrade())
instance.mark_for_upgrade(False) # reset flag
marshaled_data = instance.marshal()
# Ensure fields has been marshalled using new format
self.assertFalse(marshaled_data.startswith(b'\1'))
# Reunmarshall again and check that remarshalled flag is not set
instance = Publication(environment=environment, service=None)
instance.unmarshal(marshaled_data)
self.assertFalse(instance.needs_upgrade())
# Check that data is correct
self.check(instance)
def test_autoserialization_fields(self) -> None:
# This test is designed to ensure that all fields are autoserializable
# If some field is added or removed, this tests will warn us about it to fix the rest of the related tests
with Environment.temporary_environment() as env:
instance = Publication(environment=env, service=None)
self.assertSetEqual(set(f[0] for f in instance._autoserializable_fields()), EXPECTED_FIELDS)

View File

@@ -1,9 +1,31 @@
# -*- coding: utf-8 -*-
#
# Copyright (c) 2022 Virtual Cable S.L.U.
# Copyright (c) 2024 Virtual Cable S.L.U.
# All rights reserved.
#
# Redistribution and use in source and binary forms, with or without modification,
# are permitted provided that the following conditions are met:
#
# * Redistributions of source code must retain the above copyright notice,
# this list of conditions and the following disclaimer.
# * Redistributions in binary form must reproduce the above copyright notice,
# this list of conditions and the following disclaimer in the documentation
# and/or other materials provided with the distribution.
# * Neither the name of Virtual Cable S.L.U. nor the names of its contributors
# may be used to endorse or promote products derived from this software
# without specific prior written permission.
#
# THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS IS"
# AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE
# IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE ARE
# DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT HOLDER OR CONTRIBUTORS BE LIABLE
# FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR CONSEQUENTIAL
# DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR
# SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER
# CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY,
# OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE
# OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
"""
Author: Adolfo Gómez, dkmaster at dkmon dot com
@@ -11,14 +33,14 @@ Author: Adolfo Gómez, dkmaster at dkmon dot com
import pickle
import typing
# We use commit/rollback
from tests.utils.test import UDSTestCase
# We use storage, so we need transactional tests
from tests.utils.test import UDSTransactionTestCase
from uds.core.util import autoserializable
from uds.core.environment import Environment
from uds.services import Proxmox
from uds.services.Xen import deployment
from uds.services.Xen.deployment import Operation as Operation, XenLinkedDeployment as Deployment
# if not data.startswith(b'v'):
# return super().unmarshal(data)
@@ -36,90 +58,119 @@ from uds.services.Xen import deployment
# self.flag_for_upgrade() # Force upgrade
TEST_QUEUE: typing.Final[list[deployment.Operation]] = [
deployment.Operation.START,
deployment.Operation.STOP,
deployment.Operation.REMOVE,
EXPECTED_FIELDS: typing.Final[set[str]] = {
'_name',
'_ip',
'_mac',
'_vmid',
'_reason',
'_queue',
'_task',
}
TEST_QUEUE: typing.Final[list[Operation]] = [
Operation.CREATE,
Operation.REMOVE,
Operation.RETRY,
]
SERIALIZED_LINKED_DEPLOYMENT_DATA: typing.Final[typing.Mapping[str, bytes]] = {
SERIALIZED_DEPLOYMENT_DATA: typing.Final[typing.Mapping[str, bytes]] = {
'v1': b'v1\x01name\x01ip\x01mac\x01vmid\x01reason\x01' + pickle.dumps(TEST_QUEUE) + b'\x01task',
}
LAST_VERSION: typing.Final[str] = sorted(SERIALIZED_DEPLOYMENT_DATA.keys(), reverse=True)[0]
class XendDeploymentSerializationTest(UDSTestCase):
def check(self, version: str, instance: deployment.XenLinkedDeployment) -> None:
class XenDeploymentSerializationTest(UDSTransactionTestCase):
def check(self, version: str, instance: Deployment) -> None:
self.assertEqual(instance._name, 'name')
self.assertEqual(instance._ip, 'ip')
self.assertEqual(instance._mac, 'mac')
self.assertEqual(instance._task, 'task')
self.assertEqual(instance._vmid, 'vmid')
self.assertEqual(instance._reason, 'reason')
self.assertEqual(instance._queue, TEST_QUEUE)
self.assertEqual(instance._task, 'task')
def test_unmarshall_all_versions(self) -> None:
for v in range(1, len(SERIALIZED_LINKED_DEPLOYMENT_DATA) + 1):
version = f'v{v}'
instance = deployment.XenLinkedDeployment(
environment=Environment.testing_environment(), service=None
)
instance.unmarshal(SERIALIZED_LINKED_DEPLOYMENT_DATA[version])
def test_marshaling(self) -> None:
# queue is kept on "storage", so we need always same environment
environment = Environment.testing_environment()
self.assertTrue(instance.needs_upgrade())
self.check(version, instance)
def test_marshaling(self):
VERSION = f'v{len(SERIALIZED_LINKED_DEPLOYMENT_DATA)}'
instance = deployment.XenLinkedDeployment(
environment=Environment.testing_environment(), service=None
)
instance.unmarshal(SERIALIZED_LINKED_DEPLOYMENT_DATA[VERSION])
marshaled_data = instance.marshal()
# Ensure remarshalled flag is set
self.assertTrue(instance.needs_upgrade())
instance.flag_for_upgrade(False) # reset flag
# Ensure fields has been marshalled using new format
self.assertFalse(marshaled_data.startswith(b'v'))
# Reunmarshall again and check that remarshalled flag is not set
instance = deployment.XenLinkedDeployment(
environment=Environment.testing_environment(), service=None
)
instance.unmarshal(marshaled_data)
self.assertFalse(instance.needs_upgrade())
# Check that data is correct
self.check(VERSION, instance)
def test_marshaling_queue(self) -> None:
def _create_instance(unmarshal_data: 'bytes|None' = None) -> deployment.XenLinkedDeployment:
instance = deployment.XenLinkedDeployment(
environment=Environment.testing_environment(), service=None
)
def _create_instance(unmarshal_data: 'bytes|None' = None) -> Deployment:
instance = Deployment(environment=environment, service=None)
if unmarshal_data:
instance.unmarshal(unmarshal_data)
return instance
instance = _create_instance()
for v in range(1, len(SERIALIZED_DEPLOYMENT_DATA) + 1):
version = f'v{v}'
instance = _create_instance(SERIALIZED_DEPLOYMENT_DATA[version])
self.check(version, instance)
# Ensure remarshalled flag is set
self.assertTrue(instance.needs_upgrade())
instance.mark_for_upgrade(False) # reset flag
instance._queue = [deployment.Operation.CREATE, deployment.Operation.REMOVE]
marshaled_data = instance.marshal()
self.assertFalse(
marshaled_data.startswith(b'\v')
) # Ensure fields has been marshalled using new format
instance = _create_instance(marshaled_data)
self.assertFalse(
instance.needs_upgrade()
) # Reunmarshall again and check that remarshalled flag is not set
self.check(version, instance)
def test_marshaling_queue(self) -> None:
# queue is kept on "storage", so we need always same environment
environment = Environment.testing_environment()
# Store queue
environment.storage.put_pickle('queue', TEST_QUEUE)
def _create_instance(unmarshal_data: 'bytes|None' = None) -> Deployment:
instance = Deployment(environment=environment, service=None)
if unmarshal_data:
instance.unmarshal(unmarshal_data)
return instance
instance = _create_instance(SERIALIZED_DEPLOYMENT_DATA[LAST_VERSION])
self.assertEqual(instance._queue, TEST_QUEUE)
instance._queue = [
Operation.CREATE,
Operation.FINISH,
]
marshaled_data = instance.marshal()
instance = _create_instance(marshaled_data)
self.assertEqual(instance._queue, [deployment.Operation.CREATE, deployment.Operation.REMOVE])
self.assertEqual(
instance._queue,
[Operation.CREATE, Operation.FINISH],
)
# Append something remarshall and check
instance._queue.append(deployment.Operation.START)
instance._queue.insert(0, Operation.RETRY)
marshaled_data = instance.marshal()
instance = _create_instance(marshaled_data)
self.assertEqual(
instance._queue, [deployment.Operation.CREATE, deployment.Operation.REMOVE, deployment.Operation.START]
instance._queue,
[
Operation.RETRY,
Operation.CREATE,
Operation.FINISH,
],
)
# Remove something remarshall and check
instance._queue.pop(0)
marshaled_data = instance.marshal()
instance = _create_instance(marshaled_data)
self.assertEqual(instance._queue, [deployment.Operation.REMOVE, deployment.Operation.START])
self.assertEqual(
instance._queue,
[Operation.CREATE, Operation.FINISH],
)
def test_autoserialization_fields(self) -> None:
# This test is designed to ensure that all fields are autoserializable
# If some field is added or removed, this tests will warn us about it to fix the rest of the related tests
with Environment.temporary_environment() as env:
instance = Deployment(environment=env, service=None)
self.assertSetEqual(set(f[0] for f in instance._autoserializable_fields()), EXPECTED_FIELDS)

View File

@@ -0,0 +1,112 @@
# -*- coding: utf-8 -*-
#
# Copyright (c) 2024 Virtual Cable S.L.U.
# All rights reserved.
#
# Redistribution and use in source and binary forms, with or without modification,
# are permitted provided that the following conditions are met:
#
# * Redistributions of source code must retain the above copyright notice,
# this list of conditions and the following disclaimer.
# * Redistributions in binary form must reproduce the above copyright notice,
# this list of conditions and the following disclaimer in the documentation
# and/or other materials provided with the distribution.
# * Neither the name of Virtual Cable S.L.U. nor the names of its contributors
# may be used to endorse or promote products derived from this software
# without specific prior written permission.
#
# THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS IS"
# AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE
# IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE ARE
# DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT HOLDER OR CONTRIBUTORS BE LIABLE
# FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR CONSEQUENTIAL
# DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR
# SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER
# CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY,
# OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE
# OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
"""
Author: Adolfo Gómez, dkmaster at dkmon dot com
"""
import pickle
import typing
from tests.utils.test import UDSTestCase
from uds.core.environment import Environment
from uds.core.util import autoserializable
from uds.services.Xen.publication import XenPublication as Publication
# if not data.startswith(b'v'):
# return super().unmarshal(data)
# # logger.debug('Data: {0}'.format(data))
# vals = data.decode('utf8').split('\t')
# if vals[0] == 'v1':
# (
# self._name,
# self._reason,
# destroy_after,
# self._template_id,
# self._state,
# self._task,
# ) = vals[1:]
# else:
# raise ValueError('Invalid data format')
# self._destroy_after = destroy_after == 't'
# self.flag_for_upgrade() # Force upgrade asap
EXPECTED_FIELDS: typing.Final[set[str]] = {
'_name',
'_reason',
'_destroy_after',
'_template_id',
'_state',
'_task',
}
SERIALIZED_PUBLICATION_DATA: typing.Final[bytes] = b'v1\tname\treason\tt\ttemplate_id\tstate\ttask'
class XenPublicationSerializationTest(UDSTestCase):
def check(self, instance: Publication) -> None:
self.assertEqual(instance._name, 'name')
self.assertEqual(instance._reason, 'reason')
self.assertTrue(instance._destroy_after)
self.assertEqual(instance._template_id, 'template_id')
self.assertEqual(instance._state, 'state')
self.assertEqual(instance._task, 'task')
def test_marshaling(self) -> None:
environment = Environment.testing_environment()
instance = Publication(environment=environment, service=None)
instance.unmarshal(SERIALIZED_PUBLICATION_DATA)
self.check(instance)
# Ensure remarshalled flag is set
self.assertTrue(instance.needs_upgrade())
instance.mark_for_upgrade(False) # reset flag
marshaled_data = instance.marshal()
# Ensure fields has been marshalled using new format
self.assertFalse(marshaled_data.startswith(b'\1'))
# Reunmarshall again and check that remarshalled flag is not set
instance = Publication(environment=environment, service=None)
instance.unmarshal(marshaled_data)
self.assertFalse(instance.needs_upgrade())
# Check that data is correct
self.check(instance)
def test_autoserialization_fields(self) -> None:
# This test is designed to ensure that all fields are autoserializable
# If some field is added or removed, this tests will warn us about it to fix the rest of the related tests
with Environment.temporary_environment() as env:
instance = Publication(environment=env, service=None)
self.assertSetEqual(set(f[0] for f in instance._autoserializable_fields()), EXPECTED_FIELDS)

View File

@@ -93,15 +93,14 @@ def ensure_data(
db_data['id'] = db_data['uuid']
del db_data['uuid']
errors = compare_dicts(
dct, db_data, ignore_keys=ignore_keys, ignore_values=ignore_values
)
errors = compare_dicts(dct, db_data, ignore_keys=ignore_keys, ignore_values=ignore_values)
if errors:
logger.info('Errors found: %s', errors)
return False
return True
def random_ip_v4() -> str:
"""
Returns a random ip v4 address
@@ -110,6 +109,7 @@ def random_ip_v4() -> str:
return '.'.join(str(random.randint(0, 255)) for _ in range(4)) # nosec
def random_ip_v6() -> str:
"""
Returns a random ip v6 address
@@ -118,6 +118,7 @@ def random_ip_v6() -> str:
return ':'.join('{:04x}'.format(random.randint(0, 65535)) for _ in range(8)) # nosec
def random_mac() -> str:
"""
Returns a random mac address
@@ -126,6 +127,7 @@ def random_mac() -> str:
return ':'.join('{:02x}'.format(random.randint(0, 255)) for _ in range(6)) # nosec
def random_hostname() -> str:
"""
Returns a random hostname