1
0
mirror of https://github.com/dkmstr/openuds.git synced 2025-03-12 04:58:34 +03:00

Adapting Xen to New generics

This commit is contained in:
Adolfo Gómez García 2024-05-12 20:48:40 +02:00
parent 40f5941db1
commit 67bd9b8bd3
No known key found for this signature in database
GPG Key ID: DD1ABF20724CDA23
10 changed files with 1047 additions and 1254 deletions

View File

@ -80,8 +80,8 @@ class DynamicUserService(services.UserService, autoserializable.AutoSerializable
max_state_checks: typing.ClassVar[int] = 20
# How many "retries" operation on same state will be allowed before giving up
max_retries: typing.ClassVar[int] = consts.services.MAX_RETRIES
# If keep_state_sets_error is true, and an error occurs, the machine is set to FINISHED instead of ERROR
keep_state_sets_error: typing.ClassVar[bool] = False
# If store_error_as_finished is true, and an error occurs, the machine is set to FINISHED instead of ERROR
store_error_as_finished: typing.ClassVar[bool] = False
# If must wait untill finish queue for destroying the machine
wait_until_finish_to_destroy: typing.ClassVar[bool] = False
@ -236,7 +236,7 @@ class DynamicUserService(services.UserService, autoserializable.AutoSerializable
logger.debug('Keep on error is enabled, not removing machine')
self._set_queue(
[types.services.Operation.FINISH]
if self.keep_state_sets_error
if self.store_error_as_finished
else [types.services.Operation.ERROR]
)
return types.states.TaskState.FINISHED

View File

@ -33,23 +33,20 @@ import enum
import pickle # nosec: not insecure, we are loading our own data
import logging
import typing
import collections.abc
from uds.core import services, consts, types
from uds.core import consts, types
from uds.core.services.generics.dynamic.userservice import DynamicUserService
from uds.core.util import autoserializable
from .xen_client import XenPowerState
# Not imported at runtime, just for type checking
if typing.TYPE_CHECKING:
from uds import models
from .service import XenLinkedService
from .publication import XenPublication
logger = logging.getLogger(__name__)
class Operation(enum.IntEnum):
class OldOperation(enum.IntEnum):
"""
Operations for deployment
"""
@ -70,24 +67,31 @@ class Operation(enum.IntEnum):
UNKNOWN = 99
@staticmethod
def from_int(value: int) -> 'Operation':
def from_int(value: int) -> 'OldOperation':
try:
return Operation(value)
return OldOperation(value)
except ValueError:
return Operation.UNKNOWN
return OldOperation.UNKNOWN
def as_operation(self) -> types.services.Operation:
return {
OldOperation.CREATE: types.services.Operation.CREATE,
OldOperation.START: types.services.Operation.START,
OldOperation.STOP: types.services.Operation.STOP,
OldOperation.SUSPEND: types.services.Operation.SUSPEND,
OldOperation.REMOVE: types.services.Operation.DELETE,
OldOperation.WAIT: types.services.Operation.WAIT,
OldOperation.ERROR: types.services.Operation.ERROR,
OldOperation.FINISH: types.services.Operation.FINISH,
OldOperation.RETRY: types.services.Operation.RETRY,
OldOperation.CONFIGURE: types.services.Operation.CREATE_COMPLETED,
OldOperation.PROVISION: types.services.Operation.CREATE_COMPLETED,
OldOperation.WAIT_SUSPEND: types.services.Operation.NOP,
}.get(self, types.services.Operation.UNKNOWN)
class XenLinkedDeployment(services.UserService, autoserializable.AutoSerializable):
# : Recheck every six seconds by default (for task methods)
suggested_delay = 7
_name = autoserializable.StringField(default='')
_ip = autoserializable.StringField(default='')
_mac = autoserializable.StringField(default='')
_vmid = autoserializable.StringField(default='')
_reason = autoserializable.StringField(default='')
class XenLinkedDeployment(DynamicUserService, autoserializable.AutoSerializable):
_task = autoserializable.StringField(default='')
_queue = autoserializable.ListField[Operation]()
def initialize(self) -> None:
self._queue = []
@ -113,196 +117,35 @@ class XenLinkedDeployment(services.UserService, autoserializable.AutoSerializabl
self._mac = vals[3].decode('utf8')
self._vmid = vals[4].decode('utf8')
self._reason = vals[5].decode('utf8')
self._queue = pickle.loads(vals[6]) # nosec: not insecure, we are loading our own data
self._queue = [
i.as_operation() for i in pickle.loads(vals[6])
] # nosec: not insecure, we are loading our own data
self._task = vals[7].decode('utf8')
self.mark_for_upgrade() # Force upgrade
def get_name(self) -> str:
if not self._name:
try:
self._name = self.name_generator().get(
self.service().get_basename(), self.service().get_lenname()
)
except KeyError:
return consts.NO_MORE_NAMES
return self._name
def set_ip(self, ip: str) -> None:
logger.debug('Setting IP to %s', ip)
self._ip = ip
def get_unique_id(self) -> str:
if not self._mac:
self._mac = self.mac_generator().get(self.service().get_macs_range())
return self._mac
def get_ip(self) -> str:
return self._ip
def set_ready(self) -> types.states.TaskState:
if self.cache.get('ready') == '1':
return types.states.TaskState.FINISHED
try:
state = self.service().get_machine_power_state(self._vmid)
if state != XenPowerState.running:
self._queue = [Operation.START, Operation.FINISH]
return self._execute_queue()
self.cache.put('ready', '1', 30)
except Exception as e:
# On case of exception, log an an error and return as if the operation was executed
self.do_log(types.log.LogLevel.ERROR, 'Error setting machine state: {}'.format(e))
# return self.__error('Machine is not available anymore')
return types.states.TaskState.FINISHED
def reset(self) -> types.states.TaskState:
if self._vmid:
self.service().reset_machine(self._vmid) # Reset in sync
return types.states.TaskState.FINISHED
def process_ready_from_os_manager(self, data: typing.Any) -> types.states.TaskState:
# Here we will check for suspending the VM (when full ready)
logger.debug('Checking if cache 2 for %s', self._name)
if self._get_current_op() == Operation.WAIT:
logger.debug('Machine is ready. Moving to level 2')
self._pop_current_op() # Remove current state
return self._execute_queue()
# Do not need to go to level 2 (opWait is in fact "waiting for moving machine to cache level 2)
return types.states.TaskState.FINISHED
def deploy_for_user(self, user: 'models.User') -> types.states.TaskState:
"""
Deploys an service instance for an user.
"""
logger.debug('Deploying for user')
self._init_queue_for_deployment(False)
return self._execute_queue()
def deploy_for_cache(self, level: types.services.CacheLevel) -> types.states.TaskState:
"""
Deploys an service instance for cache
"""
self._init_queue_for_deployment(level == types.services.CacheLevel.L2)
return self._execute_queue()
def _init_queue_for_deployment(self, cache_l2: bool = False) -> None:
if cache_l2 is False:
self._queue = [
Operation.CREATE,
Operation.CONFIGURE,
Operation.PROVISION,
Operation.START,
Operation.FINISH,
OldOperation.CREATE,
OldOperation.CONFIGURE,
OldOperation.PROVISION,
OldOperation.START,
OldOperation.FINISH,
]
else:
self._queue = [
Operation.CREATE,
Operation.CONFIGURE,
Operation.PROVISION,
Operation.START,
Operation.WAIT,
Operation.WAIT_SUSPEND,
Operation.SUSPEND,
Operation.FINISH,
OldOperation.CREATE,
OldOperation.CONFIGURE,
OldOperation.PROVISION,
OldOperation.START,
OldOperation.WAIT,
OldOperation.WAIT_SUSPEND,
OldOperation.SUSPEND,
OldOperation.FINISH,
]
def _get_current_op(self) -> Operation:
if len(self._queue) == 0:
return Operation.FINISH
return self._queue[0]
def _pop_current_op(self) -> int:
if len(self._queue) == 0:
return Operation.FINISH
return self._queue.pop(0)
def _push_front_op(self, op: Operation) -> None:
self._queue.insert(0, op)
def _push_back_op(self, op: Operation) -> None:
self._queue.append(op)
def _error(self, reason: typing.Any) -> types.states.TaskState:
logger.debug('Setting error state, reason: %s', reason)
self.do_log(types.log.LogLevel.ERROR, reason)
if self._vmid != '': # Powers off and delete VM
try:
state = self.service().get_machine_power_state(self._vmid)
if state in (
XenPowerState.running,
XenPowerState.paused,
XenPowerState.suspended,
):
self.service().stop_machine(self._vmid, False) # In sync mode
self.service().remove_machine(self._vmid)
except Exception:
logger.debug('Can\'t set machine %s state to stopped', self._vmid)
self._queue = [Operation.ERROR]
self._reason = str(reason)
return types.states.TaskState.ERROR
def _execute_queue(self) -> types.states.TaskState:
self.__debug('executeQueue')
op = self._get_current_op()
if op == Operation.ERROR:
return types.states.TaskState.ERROR
if op == Operation.FINISH:
return types.states.TaskState.FINISHED
fncs: dict[Operation, typing.Optional[collections.abc.Callable[[], str]]] = {
Operation.CREATE: self._create,
Operation.RETRY: self._retry,
Operation.START: self._start_machine,
Operation.STOP: self._stop_machine,
Operation.WAIT_SUSPEND: self._wait_suspend,
Operation.SUSPEND: self._suspend_machine,
Operation.WAIT: self._wait,
Operation.REMOVE: self._remove,
Operation.CONFIGURE: self._configure,
Operation.PROVISION: self._provision,
}
try:
operation: typing.Optional[collections.abc.Callable[[], str]] = fncs.get(op, None)
if not operation:
return self._error('Unknown operation found at execution queue ({0})'.format(op))
operation()
return types.states.TaskState.RUNNING
except Exception as e:
return self._error(e)
# Queue execution methods
def _retry(self) -> types.states.TaskState:
"""
Used to retry an operation
In fact, this will not be never invoked, unless we push it twice, because
check_state method will "pop" first item when a check operation returns types.states.DeployState.FINISHED
At executeQueue this return value will be ignored, and it will only be used at check_state
"""
return types.states.TaskState.FINISHED
def _wait(self) -> types.states.TaskState:
"""
Executes opWait, it simply waits something "external" to end
"""
return types.states.TaskState.RUNNING
def _create(self) -> str:
def op_create(self) -> None:
"""
Deploys a machine from template for user/cache
"""
@ -322,245 +165,24 @@ class XenLinkedDeployment(services.UserService, autoserializable.AutoSerializabl
if not self._task:
raise Exception('Can\'t create machine')
return types.states.TaskState.RUNNING
def _remove(self) -> str:
"""
Removes a machine from system
"""
state = self.service().get_machine_power_state(self._vmid)
if state not in (XenPowerState.halted, XenPowerState.suspended):
self._push_front_op(Operation.STOP)
self._execute_queue()
else:
self.service().remove_machine(self._vmid)
return types.states.TaskState.RUNNING
def _start_machine(self) -> str:
"""
Powers on the machine
"""
task = self.service().start_machine(self._vmid)
if task is not None:
self._task = task
else:
self._task = ''
return types.states.TaskState.RUNNING
def _stop_machine(self) -> str:
"""
Powers off the machine
"""
task = self.service().stop_machine(self._vmid)
if task is not None:
self._task = task
else:
self._task = ''
return types.states.TaskState.RUNNING
def _wait_suspend(self) -> str:
"""
Before suspending, wait for machine to have the SUSPEND feature
"""
self._task = ''
return types.states.TaskState.RUNNING
def _suspend_machine(self) -> str:
"""
Suspends the machine
"""
task = self.service().suspend_machine(self._vmid)
if task is not None:
self._task = task
else:
self._task = ''
return types.states.TaskState.RUNNING
def _configure(self) -> types.states.TaskState:
def op_create_completed(self) -> None:
"""
Provisions machine & changes the mac of the indicated nic
"""
self.service().configure_machine(self._vmid, self.get_unique_id())
with self.service().provider().get_connection() as api:
api.provision_vm(self._vmid, False) # Let's try this in "sync" mode, this must be fast enough
self.service().configure_machine(self._vmid, self.get_unique_id())
return types.states.TaskState.RUNNING
def _provision(self) -> types.states.TaskState:
"""
Makes machine usable on Xen
"""
self.service().provision_machine(self._vmid, False) # Let's try this in "sync" mode, this must be fast enough
return types.states.TaskState.RUNNING
# Check methods
def _create_checker(self) -> types.states.TaskState:
def op_create_checker(self) -> types.states.TaskState:
"""
Checks the state of a deploy for an user or cache
"""
state = self.service().check_task_finished(self._task)
if state[0]: # Finished
self._vmid = state[1]
return types.states.TaskState.FINISHED
with self.service().provider().get_connection() as api:
task_info = api.get_task_info(self._task)
if task_info.is_success():
self._vmid = task_info.result
return types.states.TaskState.FINISHED
elif task_info.is_failure():
raise Exception('Error deploying machine: {}'.format(task_info.result))
return types.states.TaskState.RUNNING
def _start_checker(self) -> types.states.TaskState:
"""
Checks if machine has started
"""
if self.service().check_task_finished(self._task)[0]:
return types.states.TaskState.FINISHED
return types.states.TaskState.RUNNING
def _stop_checker(self) -> types.states.TaskState:
"""
Checks if machine has stoped
"""
if self.service().check_task_finished(self._task)[0]:
return types.states.TaskState.FINISHED
return types.states.TaskState.RUNNING
def _wait_suspend_checker(self) -> types.states.TaskState:
if self.service().can_suspend_machine(self._vmid) is True:
return types.states.TaskState.FINISHED
return types.states.TaskState.RUNNING
def _suspend_checker(self) -> types.states.TaskState:
"""
Check if the machine has suspended
"""
if self.service().check_task_finished(self._task)[0]:
return types.states.TaskState.FINISHED
return types.states.TaskState.RUNNING
def removed_checker(self) -> types.states.TaskState:
"""
Checks if a machine has been removed
"""
return types.states.TaskState.FINISHED
def _configure_checker(self) -> types.states.TaskState:
"""
Checks if change mac operation has finished.
Changing nic configuration es 1-step operation, so when we check it here, it is already done
"""
return types.states.TaskState.FINISHED
def _provision_checker(self) -> types.states.TaskState:
return types.states.TaskState.FINISHED
def check_state(self) -> types.states.TaskState:
"""
Check what operation is going on, and acts acordly to it
"""
self.__debug('check_state')
op = self._get_current_op()
if op == Operation.ERROR:
return types.states.TaskState.ERROR
if op == Operation.FINISH:
return types.states.TaskState.FINISHED
fncs: dict[int, typing.Optional[collections.abc.Callable[[], types.states.TaskState]]] = {
Operation.CREATE: self._create_checker,
Operation.RETRY: self._retry,
Operation.WAIT: self._wait,
Operation.START: self._start_checker,
Operation.STOP: self._stop_checker,
Operation.WAIT_SUSPEND: self._wait_suspend_checker,
Operation.SUSPEND: self._suspend_checker,
Operation.REMOVE: self.removed_checker,
Operation.CONFIGURE: self._configure_checker,
Operation.PROVISION: self._provision_checker,
}
try:
chkFnc: typing.Optional[collections.abc.Callable[[], types.states.TaskState]] = fncs.get(op, None)
if chkFnc is None:
return self._error('Unknown operation found at check queue ({})'.format(op))
state = chkFnc()
if state == types.states.TaskState.FINISHED:
self._pop_current_op() # Remove runing op
return self._execute_queue()
return state
except Exception as e:
return self._error(e)
def move_to_cache(self, level: types.services.CacheLevel) -> types.states.TaskState:
"""
Moves machines between cache levels
"""
if Operation.REMOVE in self._queue:
return types.states.TaskState.RUNNING
if level == types.services.CacheLevel.L1:
self._queue = [Operation.START, Operation.FINISH]
else:
self._queue = [Operation.START, Operation.SUSPEND, Operation.FINISH]
return self._execute_queue()
def error_reason(self) -> str:
return self._reason
def destroy(self) -> types.states.TaskState:
self.__debug('destroy')
# If executing something, wait until finished to remove it
# We simply replace the execution queue
op = self._get_current_op()
if op == Operation.ERROR:
return types.states.TaskState.FINISHED
if op == Operation.FINISH or op == Operation.WAIT:
self._queue = [Operation.STOP, Operation.REMOVE, Operation.FINISH]
return self._execute_queue()
self._queue = [op, Operation.STOP, Operation.REMOVE, Operation.FINISH]
# Do not execute anything.here, just continue normally
return types.states.TaskState.RUNNING
def cancel(self) -> types.states.TaskState:
return self.destroy()
@staticmethod
def __op2str(op: Operation) -> str:
return {
Operation.CREATE: 'create',
Operation.START: 'start',
Operation.STOP: 'stop',
Operation.WAIT_SUSPEND: 'wait-suspend',
Operation.SUSPEND: 'suspend',
Operation.REMOVE: 'remove',
Operation.WAIT: 'wait',
Operation.ERROR: 'error',
Operation.FINISH: 'finish',
Operation.RETRY: 'retry',
Operation.CONFIGURE: 'configuring',
Operation.PROVISION: 'provisioning',
}.get(op, '????')
def __debug(self, txt: str) -> None:
logger.debug(
'State at %s: name: %s, ip: %s, mac: %s, vmid:%s, queue: %s',
txt,
self._name,
self._ip,
self._mac,
self._vmid,
[XenLinkedDeployment.__op2str(op) for op in self._queue],
)

View File

@ -37,7 +37,7 @@ from uds.core import types
from uds.core.services.generics.fixed.userservice import FixedUserService, Operation
from uds.core.util import autoserializable
from . import xen_client
from .xen import types as xen_types, exceptions as xen_exceptions
# Not imported at runtime, just for type checking
if typing.TYPE_CHECKING:
@ -72,7 +72,7 @@ class XenFixedUserService(FixedUserService, autoserializable.AutoSerializable):
try:
state = self.service().get_machine_power_state(self._vmid)
if state != xen_client.XenPowerState.running:
if state != xen_types.PowerState.RUNNING:
self._queue = [Operation.START, Operation.FINISH]
return self._execute_queue()
@ -99,8 +99,8 @@ class XenFixedUserService(FixedUserService, autoserializable.AutoSerializable):
except Exception as e:
raise Exception('Machine not found on start machine') from e
if state != xen_client.XenPowerState.running:
self._task = self.service().start_machine(self._vmid) or ''
if state != xen_types.PowerState.RUNNING:
self._task = self.service().start_vm(self._vmid) or ''
def op_stop(self) -> None:
try:
@ -108,9 +108,9 @@ class XenFixedUserService(FixedUserService, autoserializable.AutoSerializable):
except Exception as e:
raise Exception('Machine not found on stop machine') from e
if state == xen_client.XenPowerState.running:
if state == xen_types.PowerState.RUNNING:
logger.debug('Stopping machine %s', self._vmid)
self._task = self.service().stop_machine(self._vmid) or ''
self._task = self.service().stop_vm(self._vmid) or ''
# Check methods
def _check_task_finished(self) -> types.states.TaskState:
@ -119,7 +119,7 @@ class XenFixedUserService(FixedUserService, autoserializable.AutoSerializable):
try:
finished, _per = self.service().check_task_finished(self._task)
except xen_client.XenFailure:
except xen_exceptions.XenFailure:
return types.states.TaskState.RUNNING # Try again later
except Exception as e: # Failed for some other reason
if isinstance(e.args[0], dict) and 'error_connection' in e.args[0]:

View File

@ -28,9 +28,9 @@
"""
Author: Adolfo Gómez, dkmaster at dkmon dot com
"""
import contextlib
import logging
import typing
import collections.abc
from django.utils.translation import gettext_noop as _
@ -42,7 +42,7 @@ from uds.core.util import fields
from .service import XenLinkedService
from .service_fixed import XenFixedService
from .xen_client import XenServer
from .xen import client
# from uds.core.util import validators
@ -137,18 +137,19 @@ class XenProvider(ServiceProvider): # pylint: disable=too-many-public-methods
old_field_name='hostBackup',
)
_api: typing.Optional[XenServer]
_cached_api: typing.Optional[client.XenServer]
_use_count: int = 0
# XenServer engine, right now, only permits a connection to one server and only one per instance
# If we want to connect to more than one server, we need keep locked access to api, change api server, etc..
# We have implemented an "exclusive access" client that will only connect to one server at a time (using locks)
# and this way all will be fine
def _get_api(self, force: bool = False) -> XenServer:
def _api(self) -> client.XenServer:
"""
Returns the connection API object for XenServer (using XenServersdk)
"""
if not self._api or force:
self._api = XenServer(
if not self._cached_api:
self._cached_api = client.XenServer(
self.host.value,
self.host_backup.value,
443,
@ -158,7 +159,21 @@ class XenProvider(ServiceProvider): # pylint: disable=too-many-public-methods
self.verify_ssl.as_bool(),
)
return self._api
return self._cached_api
@contextlib.contextmanager
def get_connection(self) -> typing.Iterator[client.XenServer]:
"""
Context manager for XenServer API
"""
self._use_count += 1
try:
yield self._api()
finally:
self._use_count -= 1
if self._use_count == 0 and self._cached_api:
self._cached_api.logout()
self._cached_api = None
# There is more fields type, but not here the best place to cover it
def initialize(self, values: 'types.core.ValuesType') -> None:
@ -167,7 +182,7 @@ class XenProvider(ServiceProvider): # pylint: disable=too-many-public-methods
"""
# Just reset _api connection variable
self._api = None
self._cached_api = None
def test_connection(self) -> None:
"""
@ -177,274 +192,7 @@ class XenProvider(ServiceProvider): # pylint: disable=too-many-public-methods
True if all went fine, false if id didn't
"""
self._get_api().test()
def check_task_finished(self, task: typing.Optional[str]) -> tuple[bool, str]:
"""
Checks a task state.
Returns None if task is Finished
Returns a number indicating % of completion if running
Raises an exception with status else ('cancelled', 'unknown', 'failure')
"""
if not task:
return True, ''
ts = self._get_api().get_task_info(task)
logger.debug('Task status: %s', ts)
if ts['status'] == 'running':
return False, ts['progress']
if ts['status'] == 'success':
return True, ts['result']
# Any other state, raises an exception
raise Exception(ts) # Should be error message
def list_machines(self, force: bool = False) -> list[collections.abc.MutableMapping[str, typing.Any]]:
"""
Obtains the list of machines inside XenServer.
Machines starting with UDS are filtered out
Args:
force: If true, force to update the cache, if false, tries to first
get data from cache and, if valid, return this.
Returns
An array of dictionaries, containing:
'name'
'id'
'cluster_id'
"""
return [m for m in self._get_api().list_machines() if m['name'][:3] != 'UDS']
def list_storages(self, force: bool = False) -> list[dict[str, typing.Any]]:
"""
Obtains the list of storages inside XenServer.
Args:
force: If true, force to update the cache, if false, tries to first
get data from cache and, if valid, return this.
Returns
An array of dictionaries, containing:
'name'
'id'
'size'
'used'
"""
return self._get_api().list_srs()
def get_storage_info(
self, storageId: str, force: bool = False
) -> collections.abc.MutableMapping[str, typing.Any]:
"""
Obtains the storage info
Args:
storageId: Id of the storage to get information about it
force: If true, force to update the cache, if false, tries to first
get data from cache and, if valid, return this.
Returns
A dictionary with following values
'id' -> Storage id
'name' -> Storage name
'type' -> Storage type ('data', 'iso')
'available' -> Space available, in bytes
'used' -> Space used, in bytes
# 'active' -> True or False --> This is not provided by api?? (api.storagedomains.get)
"""
return self._get_api().get_sr_info(storageId)
def get_networks(
self, force: bool = False
) -> collections.abc.Iterable[collections.abc.MutableMapping[str, typing.Any]]:
return self._get_api().list_networks()
def clone_for_template(self, name: str, comments: str, machineId: str, sr: str) -> str:
task = self._get_api().clone_machine(machineId, name, sr)
logger.debug('Task for cloneForTemplate: %s', task)
return task
def convert_to_template(self, vmid: str, shadow_multiplier: int = 4) -> None:
"""
Publish the machine (makes a template from it so we can create COWs) and returns the template id of
the creating machine
Args:
name: Name of the machine (care, only ascii characters and no spaces!!!)
machineId: id of the machine to be published
clusterId: id of the cluster that will hold the machine
storageId: id of the storage tuat will contain the publication AND linked clones
displayType: type of display (for XenServer admin interface only)
Returns
Raises an exception if operation could not be acomplished, or returns the id of the template being created.
"""
self._get_api().convert_to_template(vmid, shadow_multiplier)
def remove_template(self, templateId: str) -> None:
"""
Removes a template from XenServer server
Returns nothing, and raises an Exception if it fails
"""
self._get_api().remove_template(templateId)
def start_deploy_from_template(self, name: str, comments: str, template_id: str) -> str:
"""
Deploys a virtual machine on selected cluster from selected template
Args:
name: Name (sanitized) of the machine
comments: Comments for machine
templateId: Id of the template to deploy from
clusterId: Id of the cluster to deploy to
displayType: 'vnc' or 'spice'. Display to use ad XenServer admin interface
memoryMB: Memory requested for machine, in MB
guaranteedMB: Minimum memory guaranteed for this machine
Returns:
Id of the machine being created form template
"""
return self._get_api().start_deploy_from_template(template_id, name)
def get_machine_power_state(self, vmid: str) -> str:
"""
Returns current machine power state
"""
return self._get_api().get_machine_power_state(vmid)
def get_machine_name(self, vmid: str) -> str:
return self._get_api().get_machine_info(vmid).get('name_label', '')
def list_folders(self) -> list[str]:
return self._get_api().list_folders()
def get_machine_folder(self, vmid: str) -> str:
return self._get_api().get_machine_folder(vmid)
def get_machines_from_folder(
self, folder: str, retrieve_names: bool = False
) -> list[dict[str, typing.Any]]:
return self._get_api().get_machines_from_folder(folder, retrieve_names)
def start_machine(self, vmid: str, as_async: bool = True) -> typing.Optional[str]:
"""
Tries to start a machine. No check is done, it is simply requested to XenServer.
This start also "resume" suspended/paused machines
Args:
machineId: Id of the machine
Returns:
"""
return self._get_api().start_machine(vmid, as_async)
def stop_machine(self, vmid: str, as_async: bool = True) -> typing.Optional[str]:
"""
Tries to start a machine. No check is done, it is simply requested to XenServer
Args:
machineId: Id of the machine
Returns:
"""
return self._get_api().stop_machine(vmid, as_async)
def reset_machine(self, vmid: str, as_async: bool = True) -> typing.Optional[str]:
"""
Tries to start a machine. No check is done, it is simply requested to XenServer
Args:
machineId: Id of the machine
Returns:
"""
return self._get_api().reset_machine(vmid, as_async)
def can_suspend_machine(self, vmid: str) -> bool:
"""
The machine can be suspended only when "suspend" is in their operations list (mush have xentools installed)
Args:
machineId: Id of the machine
Returns:
True if the machien can be suspended
"""
return self._get_api().can_suspend_machine(vmid)
def suspend_machine(self, vmid: str, as_async: bool = True) -> typing.Optional[str]:
"""
Tries to start a machine. No check is done, it is simply requested to XenServer
Args:
machineId: Id of the machine
Returns:
"""
return self._get_api().suspend_machine(vmid, as_async)
def resume_machine(self, vmid: str, as_async: bool = True) -> typing.Optional[str]:
"""
Tries to start a machine. No check is done, it is simply requested to XenServer
Args:
machineId: Id of the machine
Returns:
"""
return self._get_api().resume_machine(vmid, as_async)
def shutdown_machine(self, vmid: str, as_async: bool = True) -> typing.Optional[str]:
"""
Tries to start a machine. No check is done, it is simply requested to XenServer
Args:
machineId: Id of the machine
Returns:
"""
return self._get_api().shutdown_machine(vmid, as_async)
def remove_machine(self, vmid: str) -> None:
"""
Tries to delete a machine. No check is done, it is simply requested to XenServer
Args:
machineId: Id of the machine
Returns:
"""
self._get_api().remove_machine(vmid)
def configure_machine(self, vmid: str, netId: str, mac: str, memory: int) -> None:
self._get_api().configure_machine(vmid, mac={'network': netId, 'mac': mac}, memory=memory)
def provision_machine(self, vmid: str, as_async: bool = True) -> str:
return self._get_api().provision_machine(vmid, as_async=as_async)
def get_first_ip(self, vmid: str) -> str:
return self._get_api().get_first_ip(vmid)
def get_first_mac(self, vmid: str) -> str:
return self._get_api().get_first_mac(vmid)
def create_snapshot(self, vmid: str, name: str) -> str:
return self._get_api().create_snapshot(vmid, name)
def restore_snapshot(self, snapshot_id: str) -> str:
return self._get_api().restore_snapshot(snapshot_id)
def remove_snapshot(self, snapshot_id: str) -> str:
return self._get_api().remove_snapshot(snapshot_id)
def list_snapshots(self, vmid: str, full_info: bool = False) -> list[dict[str, typing.Any]]:
return self._get_api().list_snapshots(vmid)
self._api().test()
def get_macs_range(self) -> str:
return self.macs_range.value

View File

@ -33,8 +33,9 @@ import logging
import typing
from django.utils.translation import gettext_noop as _
from uds.core import services, exceptions, types
from uds.core.util import fields, validators
from uds.core import exceptions, types
from uds.core.services.generics.dynamic.service import DynamicService
from uds.core.util import validators
from uds.core.ui import gui
from .publication import XenPublication
@ -43,15 +44,15 @@ from .deployment import XenLinkedDeployment
# Not imported at runtime, just for type checking
if typing.TYPE_CHECKING:
from .provider import XenProvider
from uds.core.services.generics.dynamic.publication import DynamicPublication
from uds.core.services.generics.dynamic.userservice import DynamicUserService
logger = logging.getLogger(__name__)
class XenLinkedService(services.Service): # pylint: disable=too-many-public-methods
class XenLinkedService(DynamicService): # pylint: disable=too-many-public-methods
"""
Xen Linked clones service. This is based on creating a template from selected vm, and then use it to
"""
# : Name to show the administrator. This string will be translated BEFORE
@ -154,8 +155,13 @@ class XenLinkedService(services.Service): # pylint: disable=too-many-public-met
required=True,
)
basename = fields.basename_field(order=114)
lenname = fields.lenname_field(order=115)
remove_duplicates = DynamicService.remove_duplicates
maintain_on_error = DynamicService.maintain_on_error
try_soft_shutdown = DynamicService.try_soft_shutdown
basename = DynamicService.basename
lenname = DynamicService.lenname
def initialize(self, values: types.core.ValuesType) -> None:
"""
@ -178,41 +184,34 @@ class XenLinkedService(services.Service): # pylint: disable=too-many-public-met
# This is that value is always '', so if we want to change something, we have to do it
# at defValue
machines_list = [gui.choice_item(m['id'], m['name']) for m in self.provider().list_machines()]
storages_list: list[types.ui.ChoiceItem] = []
for storage in self.provider().list_storages():
space, free = (
storage['size'] / 1024,
(storage['size'] - storage['used']) / 1024,
)
storages_list.append(
gui.choice_item(
storage['id'],
"%s (%4.2f Gb/%4.2f Gb)" % (storage['name'], space, free),
with self.provider().get_connection() as api:
machines_list = [gui.choice_item(m.opaque_ref, m.name) for m in api.list_vms()]
storages_list: list[types.ui.ChoiceItem] = []
for storage in api.list_srs():
space, free = (
storage.physical_size / 1024,
(storage.physical_size - storage.physical_utilisation) / 1024,
)
)
network_list = [gui.choice_item(net['id'], net['name']) for net in self.provider().get_networks()]
storages_list.append(
gui.choice_item(storage.opaque_ref, f'{storage.name} ({space:.2f} Gb/{free:.2f} Gb)')
)
network_list = [gui.choice_item(net.opaque_ref, net.name) for net in api.list_networks()]
self.machine.set_choices(machines_list)
self.datastore.set_choices(storages_list)
self.network.set_choices(network_list)
def check_task_finished(self, task: str) -> tuple[bool, str]:
return self.provider().check_task_finished(task)
def has_datastore_space(self) -> None:
# Get storages for that datacenter
info = self.provider().get_storage_info(self.datastore.value)
logger.debug('Checking datastore space for %s: %s', self.datastore.value, info)
availableGB = (info['size'] - info['used']) / 1024
if availableGB < self.min_space_gb.as_int():
raise Exception(
'Not enough free space available: (Needs at least {} GB and there is only {} GB '.format(
self.min_space_gb.as_int(), availableGB
with self.provider().get_connection() as api:
info = api.get_sr_info(self.datastore.value)
logger.debug('Checking datastore space for %s: %s', self.datastore.value, info)
availableGB = (info.physical_size - info.physical_utilisation) // 1024
if availableGB < self.min_space_gb.as_int():
raise Exception(
'Not enough free space available: (Needs at least {} GB and there is only {} GB '.format(
self.min_space_gb.as_int(), availableGB
)
)
)
def sanitized_name(self, name: str) -> str:
"""
@ -240,18 +239,19 @@ class XenLinkedService(services.Service): # pylint: disable=too-many-public-met
self.datastore.value,
)
# Checks datastore available space, raises exeception in no min available
self.has_datastore_space()
with self.provider().get_connection() as api:
self.has_datastore_space()
return self.provider().clone_for_template(name, comments, self.machine.value, self.datastore.value)
return api.clone_vm(self.machine.value, name, self.datastore.value)
def convert_to_template(self, machineId: str) -> None:
"""
converts machine to template
"""
self.provider().convert_to_template(machineId, self.shadow.value)
with self.provider().get_connection() as api:
api.convert_to_template(machineId, self.shadow.value)
def start_deploy_from_template(self, name: str, comments: str, templateId: str) -> str:
def start_deploy_from_template(self, name: str, comments: str, template_opaque_ref: str) -> str:
"""
Deploys a virtual machine on selected cluster from selected template
@ -266,129 +266,72 @@ class XenLinkedService(services.Service): # pylint: disable=too-many-public-met
Returns:
Id of the machine being created form template
"""
logger.debug('Deploying from template %s machine %s', templateId, name)
self.has_datastore_space()
logger.debug('Deploying from template %s machine %s', template_opaque_ref, name)
return self.provider().start_deploy_from_template(name, comments, templateId)
with self.provider().get_connection() as api:
self.has_datastore_space()
def remove_template(self, templateId: str) -> None:
return api.start_deploy_from_template(template_opaque_ref, name)
def remove_template(self, template_opaque_ref: str) -> None:
"""
invokes removeTemplate from parent provider
"""
self.provider().remove_template(templateId)
with self.provider().get_connection() as api:
api.delete_template(template_opaque_ref)
def get_machine_power_state(self, machineId: str) -> str:
def configure_machine(self, vm_opaque_ref: str, mac: str) -> None:
with self.provider().get_connection() as api:
api.configure_vm(
vm_opaque_ref, net_info={'network': self.network.value, 'mac': mac}, memory=self.memory.value
)
def get_ip(self, caller_instance: 'DynamicUserService | DynamicPublication', vmid: str) -> str:
"""
Invokes getMachineState from parent provider
Args:
machineId: If of the machine to get state
Returns:
one of this values:
Returns the ip of the machine
If cannot be obtained, MUST raise an exception
"""
return self.provider().get_machine_power_state(machineId)
return '' # No ip will be get, UDS will assign one (from actor)
def start_machine(self, machineId: str, asnc: bool = True) -> typing.Optional[str]:
def get_mac(self, caller_instance: 'DynamicUserService | DynamicPublication', vmid: str) -> str:
"""
Tries to start a machine. No check is done, it is simply requested to Xen.
This start also "resume" suspended/paused machines
Args:
machineId: Id of the machine
Returns:
For
"""
return self.provider().start_machine(machineId, asnc)
return self.mac_generator().get(self.provider().get_macs_range())
def stop_machine(self, machineId: str, asnc: bool = True) -> typing.Optional[str]:
def is_running(self, caller_instance: 'DynamicUserService | DynamicPublication', vmid: str) -> bool:
"""
Tries to stop a machine. No check is done, it is simply requested to Xen
Args:
machineId: Id of the machine
Returns:
Returns if the machine is ready and running
"""
return self.provider().stop_machine(machineId, asnc)
with self.provider().get_connection() as api:
vminfo = api.get_vm_info(vmid)
if vminfo.power_state.is_running():
return True
return False
def reset_machine(self, vmid: str, asnc: bool = True) -> typing.Optional[str]:
def start(self, caller_instance: 'DynamicUserService | DynamicPublication', vmid: str) -> None:
"""
Tries to stop a machine. No check is done, it is simply requested to Xen
Args:
machineId: Id of the machine
Returns:
Starts the machine
Can return a task, or None if no task is returned
"""
return self.provider().reset_machine(vmid, asnc)
with self.provider().get_connection() as api:
api.start_vm(vmid, as_async=False)
def can_suspend_machine(self, machineId: str) -> bool:
def stop(self, caller_instance: 'DynamicUserService | DynamicPublication', vmid: str) -> None:
"""
The machine can be suspended only when "suspend" is in their operations list (mush have xentools installed)
Args:
machineId: Id of the machine
Returns:
True if the machien can be suspended
Stops the machine
Can return a task, or None if no task is returned
"""
return self.provider().can_suspend_machine(machineId)
with self.provider().get_connection() as api:
api.stop_vm(vmid, as_async=False)
def suspend_machine(self, machineId: str, asnc: bool = True) -> typing.Optional[str]:
def shutdown(self, caller_instance: 'DynamicUserService | DynamicPublication', vmid: str) -> None:
with self.provider().get_connection() as api:
api.shutdown_vm(vmid, as_async=False)
def delete(self, caller_instance: 'DynamicUserService | DynamicPublication', vmid: str) -> None:
"""
Tries to suspend a machine. No check is done, it is simply requested to Xen
Args:
machineId: Id of the machine
Returns:
Removes the machine, or queues it for removal, or whatever :)
"""
return self.provider().suspend_machine(machineId, asnc)
def resume_machine(self, machineId: str, asnc: bool = True) -> typing.Optional[str]:
"""
Tries to resume a machine. No check is done, it is simply requested to Xen
Args:
machineId: Id of the machine
Returns:
"""
return self.provider().suspend_machine(machineId, asnc)
def remove_machine(self, machineId: str) -> None:
"""
Tries to delete a machine. No check is done, it is simply requested to Xen
Args:
machineId: Id of the machine
Returns:
"""
self.provider().remove_machine(machineId)
def configure_machine(self, vmid: str, mac: str) -> None:
self.provider().configure_machine(vmid, self.network.value, mac, self.memory.value)
def provision_machine(self, vmid: str, as_async: bool = True) -> str:
return self.provider().provision_machine(vmid, as_async)
def get_macs_range(self) -> str:
"""
Returns de selected mac range
"""
return self.provider().get_macs_range()
def get_basename(self) -> str:
"""
Returns the base name
"""
return self.basename.value
def get_lenname(self) -> int:
"""
Returns the length of numbers part
"""
return int(self.lenname.value)
with self.provider().get_connection() as api:
api.delete_vm(vmid)

View File

@ -104,25 +104,13 @@ class XenFixedService(FixedService): # pylint: disable=too-many-public-methods
# This is that value is always '', so if we want to change something, we have to do it
# at defValue
self.prov_uuid.value = self.provider().get_uuid()
self.folder.set_choices([gui.choice_item(folder, folder) for folder in self.provider().list_folders()])
with self.provider().get_connection() as api:
self.folder.set_choices([gui.choice_item(folder, folder) for folder in api.list_folders()])
def provider(self) -> 'XenProvider':
return typing.cast('XenProvider', super().provider())
def get_machine_power_state(self, vmid: str) -> str:
"""
Invokes getMachineState from parent provider
Args:
machineId: If of the machine to get state
Returns:
one of this values:
"""
return self.provider().get_machine_power_state(vmid)
def start_machine(self, vmid: str) -> typing.Optional[str]:
def start_vm(self, vmid: str) -> typing.Optional[str]:
"""
Tries to start a machine. No check is done, it is simply requested to Xen.
@ -133,9 +121,10 @@ class XenFixedService(FixedService): # pylint: disable=too-many-public-methods
Returns:
"""
return self.provider().start_machine(vmid)
with self.provider().get_connection() as api:
api.start_vm(vmid)
def stop_machine(self, vmid: str) -> typing.Optional[str]:
def stop_vm(self, vmid: str) -> typing.Optional[str]:
"""
Tries to stop a machine. No check is done, it is simply requested to Xen
@ -144,7 +133,8 @@ class XenFixedService(FixedService): # pylint: disable=too-many-public-methods
Returns:
"""
return self.provider().stop_machine(vmid)
with self.provider().get_connection() as api:
return api.stop_vm(vmid)
def reset_machine(self, vmid: str) -> typing.Optional[str]:
"""
@ -155,32 +145,32 @@ class XenFixedService(FixedService): # pylint: disable=too-many-public-methods
Returns:
"""
return self.provider().reset_machine(vmid)
with self.provider().get_connection() as api:
return api.reset_vm(vmid)
def shutdown_machine(self, vmid: str) -> typing.Optional[str]:
with self.provider().get_connection() as api:
return api.shutdown_vm(vmid)
return self.provider().shutdown_machine(vmid)
def check_task_finished(self, task: str) -> tuple[bool, str]:
return self.provider().check_task_finished(task)
@cached('reachable', consts.cache.SHORT_CACHE_TIMEOUT)
def is_avaliable(self) -> bool:
return self.provider().is_available()
def enumerate_assignables(self) -> collections.abc.Iterable[types.ui.ChoiceItem]:
# Obtain machines names and ids for asignables
vms: dict[str, str] = {
machine['id']: machine['name']
for machine in self.provider().get_machines_from_folder(self.folder.value, retrieve_names=True)
}
with self.provider().get_connection() as api:
vms: dict[str, str] = {
machine.opaque_ref: machine.name for machine in api.list_vms_from_folder(self.folder.value)
}
with self._assigned_access() as assigned_vms:
return [
gui.choice_item(k, vms[k])
for k in self.machines.as_list()
if k not in assigned_vms
and k in vms # Only machines not assigned, and that exists on provider will be available
]
with self._assigned_access() as assigned_vms:
return [
gui.choice_item(k, vms[k])
for k in self.machines.as_list()
if k not in assigned_vms
and k in vms # Only machines not assigned, and that exists on provider will be available
]
def assign_from_assignables(
self, assignable_id: str, user: 'models.User', userservice_instance: 'services.UserService'
@ -195,64 +185,68 @@ class XenFixedService(FixedService): # pylint: disable=too-many-public-methods
def snapshot_creation(self, userservice_instance: FixedUserService) -> None:
userservice_instance = typing.cast(XenFixedUserService, userservice_instance)
if self.use_snapshots.as_bool():
vmid = userservice_instance._vmid
with self.provider().get_connection() as api:
if self.use_snapshots.as_bool():
vmid = userservice_instance._vmid
snapshots = api.list_snapshots(vmid, full_info=False) # Only need ids, to check if there is any snapshot
snapshots = [i['id'] for i in self.provider().list_snapshots(vmid)]
snapshot = snapshots[0] if snapshots else None
logger.debug('Using snapshots')
# If no snapshot exists for this vm, try to create one for it on background
# Lauch an snapshot. We will not wait for it to finish, but instead let it run "as is"
try:
if not snapshot: # No snapshot, try to create one
logger.debug('Not current snapshot')
# We don't need the snapshot nor the task, will simply restore to newer snapshot on remove
self.provider().create_snapshot(
vmid,
name='UDS Snapshot',
)
except Exception as e:
self.do_log(types.log.LogLevel.WARNING, 'Could not create SNAPSHOT for this VM. ({})'.format(e))
logger.debug('Using snapshots')
# If no snapshot exists for this vm, try to create one for it on background
# Lauch an snapshot. We will not wait for it to finish, but instead let it run "as is"
try:
if not snapshots: # No snapshot, try to create one
logger.debug('Not current snapshot')
# We don't need the snapshot nor the task, will simply restore to newer snapshot on remove
api.create_snapshot(
vmid,
name='UDS Snapshot',
)
except Exception as e:
self.do_log(types.log.LogLevel.WARNING, 'Could not create SNAPSHOT for this VM. ({})'.format(e))
def snapshot_recovery(self, userservice_instance: FixedUserService) -> None:
userservice_instance = typing.cast(XenFixedUserService, userservice_instance)
if self.use_snapshots.as_bool():
vmid = userservice_instance._vmid
with self.provider().get_connection() as api:
if self.use_snapshots.as_bool():
vmid = userservice_instance._vmid
snapshots = [i['id'] for i in self.provider().list_snapshots(vmid)]
snapshot = snapshots[0] if snapshots else None
snapshots = api.list_snapshots(vmid)
if snapshot:
try:
userservice_instance._task = self.provider().restore_snapshot(snapshot['id'])
except Exception as e:
self.do_log(types.log.LogLevel.WARNING, 'Could not restore SNAPSHOT for this VM. ({})'.format(e))
if snapshots:
try:
# 0 is most recent snapshot
userservice_instance._task = api.restore_snapshot(snapshots[0].opaque_ref)
except Exception as e:
self.do_log(
types.log.LogLevel.WARNING, 'Could not restore SNAPSHOT for this VM. ({})'.format(e)
)
def get_and_assign(self) -> str:
found_vmid: typing.Optional[str] = None
with self._assigned_access() as assigned_vms:
try:
for checking_vmid in self.sorted_assignables_list():
if checking_vmid not in assigned_vms: # Not assigned
# Check that the machine exists...
try:
_vm_name = self.provider().get_machine_name(checking_vmid)
found_vmid = checking_vmid
break
except Exception: # Notifies on log, but skipt it
self.provider().do_log(
types.log.LogLevel.WARNING, 'Machine {} not accesible'.format(found_vmid)
)
logger.warning(
'The service has machines that cannot be checked on xen (connection error or machine has been deleted): %s',
found_vmid,
)
with self.provider().get_connection() as api:
with self._assigned_access() as assigned_vms:
try:
for checking_vmid in self.sorted_assignables_list():
if checking_vmid not in assigned_vms: # Not assigned
# Check that the machine exists...
try:
api.get_vm_info(checking_vmid) # Will raise an exception if not exists
found_vmid = checking_vmid
break
except Exception: # Notifies on log, but skipt it
self.provider().do_log(
types.log.LogLevel.WARNING, 'Machine {} not accesible'.format(found_vmid)
)
logger.warning(
'The service has machines that cannot be checked on xen (connection error or machine has been deleted): %s',
found_vmid,
)
if found_vmid:
assigned_vms.add(str(found_vmid))
except Exception: #
raise Exception('No machine available')
if found_vmid:
assigned_vms.add(str(found_vmid))
except Exception: #
raise Exception('No machine available')
if not found_vmid:
raise Exception('All machines from list already assigned.')
@ -260,13 +254,16 @@ class XenFixedService(FixedService): # pylint: disable=too-many-public-methods
return str(found_vmid)
def get_mac(self, vmid: str) -> str:
return self.provider().get_first_mac(vmid)
with self.provider().get_connection() as conn:
return conn.get_first_mac(vmid)
def get_ip(self, vmid: str) -> str:
return self.provider().get_first_ip(vmid)
with self.provider().get_connection() as conn:
return conn.get_first_ip(vmid)
def get_name(self, vmid: str) -> str:
return self.provider().get_machine_name(vmid)
with self.provider().get_connection() as conn:
return conn.get_vm_info(vmid).name
def remove_and_free(self, vmid: str) -> types.states.TaskState:
try:

View File

@ -0,0 +1,30 @@
# -*- coding: utf-8 -*-
#
# Copyright (c) 2014-2023 Virtual Cable S.L.U.
# All rights reserved.
#
# Redistribution and use in source and binary forms, with or without modification,
# are permitted provided that the following conditions are met:
#
# * Redistributions of source code must retain the above copyright notice,
# this list of conditions and the following disclaimer.
# * Redistributions in binary form must reproduce the above copyright notice,
# this list of conditions and the following disclaimer in the documentation
# and/or other materials provided with the distribution.
# * Neither the name of Virtual Cable S.L.U. nor the names of its contributors
# may be used to endorse or promote products derived from this software
# without specific prior written permission.
#
# THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS IS"
# AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE
# IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE ARE
# DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT HOLDER OR CONTRIBUTORS BE LIABLE
# FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR CONSEQUENTIAL
# DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR
# SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER
# CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY,
# OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE
# OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
"""
Author: Adolfo Gómez, dkmaster at dkmon dot com
"""

View File

@ -1,6 +1,6 @@
# -*- coding: utf-8 -*-
#
# Copyright (c) 2014-2023 Virtual Cable S.L.U.
# Copyright (c) 2014-2024 Virtual Cable S.L.U.
# All rights reserved.
#
# Redistribution and use in source and binary forms, with or without modification,
@ -25,17 +25,22 @@
# CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY,
# OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE
# OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
import ssl
"""
Author: Adolfo Gómez, dkmaster at dkmon dot com
"""
import xmlrpc.client
import logging
import typing
from uds.core import consts
from uds.core.util.decorators import cached
from uds.core.util import security
import XenAPI # pyright: ignore
from . import types as xen_types
from . import exceptions
logger = logging.getLogger(__name__)
@ -43,69 +48,10 @@ TAG_TEMPLATE = "uds-template"
TAG_MACHINE = "uds-machine"
class XenFault(Exception):
pass
def cache_key_helper(server_api: 'XenServer') -> str:
return server_api._url # pyright: ignore[reportPrivateUsage]
class XenFailure(XenAPI.Failure, XenFault):
exBadVmPowerState = 'VM_BAD_POWER_STATE'
exVmMissingPVDrivers = 'VM_MISSING_PV_DRIVERS'
exHandleInvalid = 'HANDLE_INVALID'
exHostIsSlave = 'HOST_IS_SLAVE'
exSRError = 'SR_BACKEND_FAILURE_44'
def __init__(self, details: typing.Optional[list[typing.Any]] = None):
details = [] if details is None else details
super(XenFailure, self).__init__(details)
def isHandleInvalid(self) -> bool:
return typing.cast(typing.Any, self.details[0]) == XenFailure.exHandleInvalid
def needs_xen_tools(self) -> bool:
return typing.cast(typing.Any, self.details[0]) == XenFailure.exVmMissingPVDrivers
def bad_power_state(self) -> bool:
return typing.cast(typing.Any, self.details[0]) == XenFailure.exBadVmPowerState
def is_slave(self) -> bool:
return typing.cast(typing.Any, self.details[0]) == XenFailure.exHostIsSlave
def as_human_readable(self) -> str:
try:
error_list = {
XenFailure.exBadVmPowerState: 'Machine state is invalid for requested operation (needs {2} and state is {3})',
XenFailure.exVmMissingPVDrivers: 'Machine needs Xen Server Tools to allow requested operation',
XenFailure.exHostIsSlave: 'The connected host is an slave, try to connect to {1}',
XenFailure.exSRError: 'Error on SR: {2}',
XenFailure.exHandleInvalid: 'Invalid reference to {1}',
}
err = error_list.get(typing.cast(typing.Any, self.details[0]), 'Error {0}')
return err.format(*typing.cast(list[typing.Any], self.details))
except Exception:
return 'Unknown exception: {0}'.format(self.details)
def __str__(self) -> str:
return self.as_human_readable()
class XenException(XenFault):
def __init__(self, message: typing.Any):
XenFault.__init__(self, message)
logger.debug('Exception create: %s', message)
class XenPowerState: # pylint: disable=too-few-public-methods
halted: str = 'Halted'
running: str = 'Running'
suspended: str = 'Suspended'
paused: str = 'Paused'
class XenServer: # pylint: disable=too-many-public-methods
_originalHost: str
_host: str
@ -129,14 +75,14 @@ class XenServer: # pylint: disable=too-many-public-methods
port: int,
username: str,
password: str,
useSSL: bool = False,
verifySSL: bool = False,
ssl: bool = False,
verify_ssl: bool = False,
):
self._originalHost = self._host = host
self._host_backup = host_backup or ''
self._port = str(port)
self._use_ssl = bool(useSSL)
self._verify_ssl = bool(verifySSL)
self._use_ssl = bool(ssl)
self._verify_ssl = bool(verify_ssl)
self._protocol = 'http' + ('s' if self._use_ssl else '') + '://'
self._url = ''
self._logged_in = False
@ -149,12 +95,9 @@ class XenServer: # pylint: disable=too-many-public-methods
def to_mb(number: typing.Union[str, int]) -> int:
return int(number) // (1024 * 1024)
def check_login(self) -> bool:
if not self._logged_in:
self.login(swithc_to_master=True)
return self._logged_in
def get_xenapi_property(self, prop: str) -> typing.Any:
# Properties to access private vars
# p
def _get_xenapi_property(self, prop: str) -> typing.Any:
if not self.check_login():
raise Exception("Can't log in")
return getattr(self._session.xenapi, prop)
@ -162,50 +105,48 @@ class XenServer: # pylint: disable=too-many-public-methods
# Properties to fast access XenApi classes
@property
def Async(self) -> typing.Any:
return self.get_xenapi_property('Async')
return self._get_xenapi_property('Async')
@property
def task(self) -> typing.Any:
return self.get_xenapi_property('task')
return self._get_xenapi_property('task')
@property
def VM(self) -> typing.Any:
return self.get_xenapi_property('VM')
return self._get_xenapi_property('VM')
@property
def SR(self) -> typing.Any:
return self.get_xenapi_property('SR')
return self._get_xenapi_property('SR')
@property
def pool(self) -> typing.Any:
return self.get_xenapi_property('pool')
return self._get_xenapi_property('pool')
@property
def host(self) -> typing.Any: # Host
return self.get_xenapi_property('host')
return self._get_xenapi_property('host')
@property
def network(self) -> typing.Any: # Networks
return self.get_xenapi_property('network')
return self._get_xenapi_property('network')
@property
def VIF(self) -> typing.Any: # Virtual Interface
return self.get_xenapi_property('VIF')
return self._get_xenapi_property('VIF')
@property
def VDI(self) -> typing.Any: # Virtual Disk Image
return self.get_xenapi_property('VDI')
return self._get_xenapi_property('VDI')
@property
def VBD(self) -> typing.Any: # Virtual Block Device
return self.get_xenapi_property('VBD')
return self._get_xenapi_property('VBD')
@property
def VM_guest_metrics(self) -> typing.Any:
return self.get_xenapi_property('VM_guest_metrics')
return self._get_xenapi_property('VM_guest_metrics')
# Properties to access private vars
# p
def has_pool(self) -> bool:
return self.check_login() and bool(self._pool_name)
@ -215,7 +156,7 @@ class XenServer: # pylint: disable=too-many-public-methods
return self.pool.get_name_label(pool)
# Login/Logout
def login(self, swithc_to_master: bool = False, backup_checked: bool = False) -> None:
def login(self, switch_to_master: bool = False, backup_checked: bool = False) -> None:
try:
# We recalculate here url, because we can "switch host" on any moment
self._url = self._protocol + self._host + ':' + self._port
@ -223,33 +164,30 @@ class XenServer: # pylint: disable=too-many-public-methods
transport = None
if self._use_ssl:
context = ssl.SSLContext(ssl.PROTOCOL_TLS)
if self._verify_ssl is False:
context.verify_mode = ssl.CERT_NONE
else:
context.verify_mode = ssl.CERT_REQUIRED
context.check_hostname = True
context = security.create_client_sslcontext(verify=self._verify_ssl)
transport = xmlrpc.client.SafeTransport(context=context)
logger.debug('Transport: %s', transport)
self._session = XenAPI.Session(self._url, transport=transport)
self._session.xenapi.login_with_password(self._username, self._password)
self._session.xenapi.login_with_password(
self._username, self._password, '', 'UDS XenServer Connector'
)
self._logged_in = True
self._api_version = self._session.API_version
self._pool_name = str(self.get_pool_name())
except (
XenAPI.Failure
) as e: # XenAPI.Failure: ['HOST_IS_SLAVE', '172.27.0.29'] indicates that this host is an slave of 172.27.0.29, connect to it...
if swithc_to_master and e.details[0] == 'HOST_IS_SLAVE':
if switch_to_master and e.details[0] == 'HOST_IS_SLAVE':
logger.info(
'%s is an Slave, connecting to master at %s',
self._host,
typing.cast(typing.Any, e.details[1])
typing.cast(typing.Any, e.details[1]),
)
self._host = e.details[1]
self.login(backup_checked=backup_checked)
else:
raise XenFailure(e.details)
raise exceptions.XenFailure(e.details)
except Exception:
if self._host == self._host_backup or not self._host_backup or backup_checked:
logger.exception('Connection to master server is broken and backup connection unavailable.')
@ -258,213 +196,152 @@ class XenServer: # pylint: disable=too-many-public-methods
self._host = self._host_backup
self.login(backup_checked=True)
def test(self) -> None:
self.login(False)
def logout(self) -> None:
self._session.logout()
if self._logged_in:
try:
self._session.logout()
except Exception as e:
logger.warning('Error logging out: %s', e)
self._session = None
self._logged_in = False
self._session = None
self._pool_name = self._api_version = ''
def check_login(self) -> bool:
if not self._logged_in:
self.login(switch_to_master=True)
return self._logged_in
def test(self) -> None:
self.login(False)
def get_host(self) -> str:
return self._host
def set_host(self, host: str) -> None:
self._host = host
def get_task_info(self, task: str) -> dict[str, typing.Any]:
progress = 0
result: typing.Any = None
destroy_task = False
def get_task_info(self, task_opaque_ref: str) -> xen_types.TaskInfo:
try:
status = self.task.get_status(task)
logger.debug('Task %s in state %s', task, status)
if status == 'pending':
status = 'running'
progress = int(self.task.get_progress(task) * 100)
elif status == 'success':
result = self.task.get_result(task)
destroy_task = True
elif status == 'failure':
result = XenFailure(self.task.get_error_info(task))
destroy_task = True
task_info = xen_types.TaskInfo.from_dict(self.task.get_record(task_opaque_ref), task_opaque_ref)
except XenAPI.Failure as e:
logger.debug('XenServer Failure: %s', typing.cast(str, e.details[0]))
if e.details[0] == 'HANDLE_INVALID':
result = None
status = 'unknown'
progress = 0
else:
destroy_task = True
result = typing.cast(str, e.details[0])
status = 'failure'
except ConnectionError as e:
logger.debug('Connection error: %s', e)
result = 'Connection error'
status = 'failure'
except Exception as e:
logger.exception('Unexpected exception!')
result = str(e)
status = 'failure'
# Removes <value></value> if present
if result and not isinstance(result, XenFailure) and result.startswith('<value>'):
result = result[7:-8]
if destroy_task:
try:
self.task.destroy(task)
except Exception as e:
logger.warning('Destroy task %s returned error %s', task, str(e))
return {'result': result, 'progress': progress, 'status': str(status), 'connection_error': True}
return xen_types.TaskInfo.unknown_task(task_opaque_ref)
raise exceptions.XenFailure(e.details)
return task_info
@cached(prefix='xen_srs', timeout=consts.cache.DEFAULT_CACHE_TIMEOUT, key_helper=cache_key_helper)
def list_srs(self) -> list[dict[str, typing.Any]]:
return_list: list[dict[str, typing.Any]] = []
for srId in self.SR.get_all():
# Only valid SR shared, non iso
name_label = self.SR.get_name_label(srId)
# Skip non valid...
if self.SR.get_content_type(srId) == 'iso' or self.SR.get_shared(srId) is False or name_label == '':
continue
def list_srs(self) -> list[xen_types.StorageInfo]:
return_list: list[xen_types.StorageInfo] = []
for sr_id, sr_raw in typing.cast(dict[str, typing.Any], self.SR.get_all_records()).items():
sr = xen_types.StorageInfo.from_dict(sr_raw, sr_id)
if sr.is_usable():
return_list.append(sr)
valid = True
allowed_ops = self.SR.get_allowed_operations(srId)
for v in ['vdi_create', 'vdi_clone', 'vdi_snapshot', 'vdi_destroy']:
if v not in allowed_ops:
valid = False
if valid:
return_list.append(
{
'id': srId,
'name': name_label,
'size': XenServer.to_mb(self.SR.get_physical_size(srId)),
'used': XenServer.to_mb(self.SR.get_physical_utilisation(srId)),
}
)
return return_list
@cached(prefix='xen_sr', timeout=consts.cache.SHORT_CACHE_TIMEOUT, key_helper=cache_key_helper)
def get_sr_info(self, srid: str) -> dict[str, typing.Any]:
return {
'id': srid,
'name': self.SR.get_name_label(srid),
'size': XenServer.to_mb(self.SR.get_physical_size(srid)),
'used': XenServer.to_mb(self.SR.get_physical_utilisation(srid)),
}
def get_sr_info(self, sr_opaque_ref: str) -> xen_types.StorageInfo:
return xen_types.StorageInfo.from_dict(self.SR.get_record(sr_opaque_ref), sr_opaque_ref)
@cached(prefix='xen_nets', timeout=consts.cache.DEFAULT_CACHE_TIMEOUT, key_helper=cache_key_helper)
def list_networks(self, **kwargs: typing.Any) -> list[dict[str, typing.Any]]:
return_list: list[dict[str, typing.Any]] = []
for netId in self.network.get_all():
if self.network.get_other_config(netId).get('is_host_internal_management_network', False) is False:
return_list.append(
{
'id': netId,
'name': self.network.get_name_label(netId),
}
)
def list_networks(self, **kwargs: typing.Any) -> list[xen_types.NetworkInfo]:
return_list: list[xen_types.NetworkInfo] = []
for netid in self.network.get_all():
netinfo = xen_types.NetworkInfo.from_dict(self.network.get_record(netid), netid)
if netinfo.is_host_internal_management_network is False:
return_list.append(netinfo)
return return_list
@cached(prefix='xen_net', timeout=consts.cache.SHORT_CACHE_TIMEOUT, key_helper=cache_key_helper)
def get_network_info(self, net_id: str) -> dict[str, typing.Any]:
return {'id': net_id, 'name': self.network.get_name_label(net_id)}
def get_network_info(self, network_opaque_ref: str) -> xen_types.NetworkInfo:
return xen_types.NetworkInfo.from_dict(self.network.get_record(network_opaque_ref), network_opaque_ref)
@cached(prefix='xen_vms', timeout=consts.cache.DEFAULT_CACHE_TIMEOUT, key_helper=cache_key_helper)
def list_machines(self) -> list[dict[str, typing.Any]]:
return_list: list[dict[str, typing.Any]] = []
try:
vms = self.VM.get_all()
for vm in vms:
try:
# if self.VM.get_is_a_template(vm): # Sample set_tags, easy..
# self.VM.set_tags(vm, ['template'])
# continue
if self.VM.get_is_control_domain(vm) or self.VM.get_is_a_template(vm):
continue
def list_vms(self) -> list[xen_types.VMInfo]:
return_list: list[xen_types.VMInfo] = []
return_list.append({'id': vm, 'name': self.VM.get_name_label(vm)})
except Exception as e:
logger.warning('VM %s returned error %s', vm, str(e))
continue
try:
for vm_id, vm_raw in typing.cast(dict[str, typing.Any], self.VM.get_all_records()).items():
vm = xen_types.VMInfo.from_dict(vm_raw, vm_id)
if vm.is_usable():
return_list.append(vm)
return return_list
except XenAPI.Failure as e:
raise XenFailure(e.details)
raise exceptions.XenFailure(e.details)
except Exception as e:
raise XenException(str(e))
def get_machine_power_state(self, vmId: str) -> str:
try:
power_state = self.VM.get_power_state(vmId)
logger.debug('Power state of %s: %s', vmId, power_state)
return power_state
except XenAPI.Failure as e:
raise XenFailure(e.details)
raise exceptions.XenException(str(e))
@cached(prefix='xen_vm', timeout=consts.cache.SHORT_CACHE_TIMEOUT, key_helper=cache_key_helper)
def get_machine_info(self, vmid: str, **kwargs: typing.Any) -> dict[str, typing.Any]:
def get_vm_info(self, vm_opaque_ref: str, **kwargs: typing.Any) -> xen_types.VMInfo:
try:
return self.VM.get_record(vmid)
return xen_types.VMInfo.from_dict(self.VM.get_record(vm_opaque_ref), vm_opaque_ref)
except XenAPI.Failure as e:
raise XenFailure(e.details)
raise exceptions.XenFailure(e.details)
@cached(prefix='xen_vm_f', timeout=consts.cache.SHORT_CACHE_TIMEOUT, key_helper=cache_key_helper)
def get_machine_folder(self, vmid: str, **kwargs: typing.Any) -> str:
def get_vm_folder(self, vmid: str, **kwargs: typing.Any) -> str:
try:
other_config = self.VM.get_other_config(vmid)
return other_config.get('folder', '')
except XenAPI.Failure as e:
raise XenFailure(e.details)
raise exceptions.XenFailure(e.details)
def start_machine(self, vmId: str, as_async: bool = True) -> typing.Optional[str]:
vmState = self.get_machine_power_state(vmId)
if vmState == XenPowerState.running:
return None # Already powered on
if vmState == XenPowerState.suspended:
return self.resume_machine(vmId, as_async)
return (self.Async if as_async else self).VM.start(vmId, False, False)
def stop_machine(self, vmid: str, as_async: bool = True) -> typing.Optional[str]:
vmState = self.get_machine_power_state(vmid)
if vmState in (XenPowerState.suspended, XenPowerState.halted):
return None # Already powered off
return (self.Async if as_async else self).VM.hard_shutdown(vmid)
def reset_machine(self, vmid: str, as_async: bool = True) -> typing.Optional[str]:
vmState = self.get_machine_power_state(vmid)
if vmState in (XenPowerState.suspended, XenPowerState.halted):
return None # Already powered off, cannot reboot
return (self.Async if as_async else self).VM.hard_reboot(vmid)
def can_suspend_machine(self, vmid: str) -> bool:
operations = self.VM.get_allowed_operations(vmid)
logger.debug('Operations: %s', operations)
return 'suspend' in operations
def suspend_machine(self, vmid: str, as_async: bool = True) -> typing.Optional[str]:
vm_state = self.get_machine_power_state(vmid)
if vm_state == XenPowerState.suspended:
def start_vm(self, vm_opaque_ref: str, as_async: bool = True) -> typing.Optional[str]:
vminfo = self.get_vm_info(vm_opaque_ref)
if vminfo.power_state.is_running():
return None
return (self.Async if as_async else self).VM.suspend(vmid)
def resume_machine(self, vmid: str, as_async: bool = True) -> typing.Optional[str]:
vm_state = self.get_machine_power_state(vmid)
if vm_state != XenPowerState.suspended:
if vminfo.power_state == xen_types.PowerState.SUSPENDED:
return self.resume_vm(vm_opaque_ref, as_async)
return (self.Async if as_async else self).VM.start(vm_opaque_ref, False, False)
def stop_vm(self, vm_opaque_ref: str, as_async: bool = True) -> typing.Optional[str]:
vminfo = self.get_vm_info(vm_opaque_ref)
if vminfo.power_state.is_stopped():
return None
return (self.Async if as_async else self).VM.resume(vmid, False, False)
def shutdown_machine(self, vmid: str, as_async: bool = True) -> typing.Optional[str]:
vm_state = self.get_machine_power_state(vmid)
if vm_state in (XenPowerState.suspended, XenPowerState.halted):
return (self.Async if as_async else self).VM.hard_shutdown(vm_opaque_ref)
def reset_vm(self, vm_opaque_ref: str, as_async: bool = True) -> typing.Optional[str]:
vminfo = self.get_vm_info(vm_opaque_ref)
if vminfo.power_state.is_stopped():
return None
return (self.Async if as_async else self).VM.clean_shutdown(vmid)
def clone_machine(self, vmId: str, target_name: str, target_sr: typing.Optional[str] = None) -> str:
return (self.Async if as_async else self).VM.hard_reboot(vm_opaque_ref)
def suspend_vm(self, vm_opaque_ref: str, as_async: bool = True) -> typing.Optional[str]:
vminfo = self.get_vm_info(vm_opaque_ref)
if vminfo.power_state.is_stopped():
return None
if vminfo.supports_suspend() is False:
# Shutdown machine if it can't be suspended
return self.shutdown_vm(vm_opaque_ref, as_async)
return (self.Async if as_async else self).VM.suspend(vm_opaque_ref)
def resume_vm(self, vm_opaque_ref: str, as_async: bool = True) -> typing.Optional[str]:
vminfo = self.get_vm_info(vm_opaque_ref)
if vminfo.power_state.is_running():
return None
if vminfo.power_state.is_suspended() is False:
return self.start_vm(vm_opaque_ref, as_async)
return (self.Async if as_async else self).VM.resume(vm_opaque_ref, False, False)
def shutdown_vm(self, vm_opaque_ref: str, as_async: bool = True) -> typing.Optional[str]:
vminfo = self.get_vm_info(vm_opaque_ref)
if vminfo.power_state.is_stopped():
return None
return (self.Async if as_async else self).VM.clean_shutdown(vm_opaque_ref)
def clone_vm(self, vm_opaque_ref: str, target_name: str, target_sr: typing.Optional[str] = None) -> str:
"""
If target_sr is NONE:
Clones the specified VM, making a new VM.
@ -477,24 +354,28 @@ class XenServer: # pylint: disable=too-many-public-methods
'full disks' - i.e. not part of a CoW chain.
This function can only be called when the VM is in the Halted State.
"""
logger.debug('Cloning VM %s to %s on sr %s', vmId, target_name, target_sr)
operations = self.VM.get_allowed_operations(vmId)
logger.debug('Cloning VM %s to %s on sr %s', vm_opaque_ref, target_name, target_sr)
operations = self.VM.get_allowed_operations(vm_opaque_ref)
logger.debug('Allowed operations: %s', operations)
try:
if target_sr:
if 'copy' not in operations:
raise XenException('Copy is not supported for this machine (maybe it\'s powered on?)')
task = self.Async.VM.copy(vmId, target_name, target_sr)
raise exceptions.XenException(
'Copy is not supported for this machine (maybe it\'s powered on?)'
)
task = self.Async.VM.copy(vm_opaque_ref, target_name, target_sr)
else:
if 'clone' not in operations:
raise XenException('Clone is not supported for this machine (maybe it\'s powered on?)')
task = self.Async.VM.clone(vmId, target_name)
raise exceptions.XenException(
'Clone is not supported for this machine (maybe it\'s powered on?)'
)
task = self.Async.VM.clone(vm_opaque_ref, target_name)
return task
except XenAPI.Failure as e:
raise XenFailure(e.details)
raise exceptions.XenFailure(e.details)
def remove_machine(self, vmid: str) -> None:
def delete_vm(self, vmid: str) -> None:
logger.debug('Removing machine')
vdis_to_delete: list[str] = []
for vdb in self.VM.get_VBDs(vmid):
@ -516,10 +397,10 @@ class XenServer: # pylint: disable=too-many-public-methods
for vdi in vdis_to_delete:
self.VDI.destroy(vdi)
def configure_machine(
def configure_vm(
self,
vmid: str,
mac: typing.Optional[dict[str, str]] = None,
net_info: typing.Optional[dict[str, str]] = None,
memory: typing.Optional[typing.Union[str, int]] = None,
) -> None:
"""
@ -532,16 +413,16 @@ class XenServer: # pylint: disable=too-many-public-methods
# If requested mac address change
try:
if mac is not None:
if net_info is not None:
all_VIFs: list[str] = self.VM.get_VIFs(vmid)
if not all_VIFs:
raise XenException('No Network interfaces found!')
raise exceptions.XenException('No Network interfaces found!')
found = (all_VIFs[0], self.VIF.get_record(all_VIFs[0]))
for vifId in all_VIFs:
vif = self.VIF.get_record(vifId)
logger.info('VIF: %s', vif)
if vif['network'] == mac['network']:
if vif['network'] == net_info['network']:
found = (vifId, vif)
break
@ -549,8 +430,8 @@ class XenServer: # pylint: disable=too-many-public-methods
vifId, vif = found
self.VIF.destroy(vifId)
vif['MAC'] = mac['mac']
vif['network'] = mac['network']
vif['MAC'] = net_info['mac']
vif['network'] = net_info['network']
vif['MAC_autogenerated'] = False
self.VIF.create(vif)
@ -561,7 +442,7 @@ class XenServer: # pylint: disable=too-many-public-methods
memory = str(int(memory) * 1024 * 1024)
self.VM.set_memory_limits(vmid, memory, memory, memory, memory)
except XenAPI.Failure as e:
raise XenFailure(e.details)
raise exceptions.XenFailure(e.details)
def get_first_ip(
self, vmid: str, ip_type: typing.Optional[typing.Union[typing.Literal['4'], typing.Literal['6']]] = None
@ -584,7 +465,7 @@ class XenServer: # pylint: disable=too-many-public-methods
return networks[net_name]
return ''
except XenAPI.Failure as e:
raise XenFailure(e.details)
raise exceptions.XenFailure(e.details)
def get_first_mac(self, vmid: str) -> str:
"""Returns the first MAC of the machine, or '' if not found"""
@ -595,9 +476,9 @@ class XenServer: # pylint: disable=too-many-public-methods
vif = self.VIF.get_record(vifs[0])
return vif['MAC']
except XenAPI.Failure as e:
raise XenFailure(e.details)
raise exceptions.XenFailure(e.details)
def provision_machine(self, vmid: str, as_async: bool = True) -> str:
def provision_vm(self, vmid: str, as_async: bool = True) -> str:
tags = self.VM.get_tags(vmid)
try:
del tags[tags.index(TAG_TEMPLATE)]
@ -614,22 +495,24 @@ class XenServer: # pylint: disable=too-many-public-methods
try:
return self.Async.VM.snapshot(vmid, name)
except XenAPI.Failure as e:
raise XenFailure(e.details)
raise exceptions.XenFailure(e.details)
def restore_snapshot(self, snapshot_id: str) -> str:
try:
return self.Async.VM.snapshot_revert(snapshot_id)
except XenAPI.Failure as e:
raise XenFailure(e.details)
raise exceptions.XenFailure(e.details)
def remove_snapshot(self, snapshot_id: str) -> str:
try:
return self.Async.VM.destroy(snapshot_id)
except XenAPI.Failure as e:
raise XenFailure(e.details)
raise exceptions.XenFailure(e.details)
@cached(prefix='xen_snapshots', timeout=consts.cache.SHORT_CACHE_TIMEOUT, key_helper=cache_key_helper)
def list_snapshots(self, vmid: str, full_info: bool = False, **kwargs: typing.Any) -> list[dict[str, typing.Any]]:
def list_snapshots(
self, vmid: str, full_info: bool = True, **kwargs: typing.Any
) -> list[xen_types.VMInfo]:
"""Returns a list of snapshots for the specified VM, sorted by snapshot_time in descending order.
(That is, the most recent snapshot is first in the list.)
@ -638,27 +521,21 @@ class XenServer: # pylint: disable=too-many-public-methods
full_info: If True, return full information about each snapshot. If False, return only the snapshot ID
Returns:
A list of dictionaries, each containing the following keys:
id: The snapshot ID.
name: The snapshot name.
A list of snapshots for the specified VM, sorted by snapshot_time in descending order.
"""
try:
snapshots = self.VM.get_snapshots(vmid)
if not full_info:
return [{'id': snapshot for snapshot in snapshots}]
return [xen_types.VMInfo.empty(snapshot) for snapshot in snapshots]
# Return full info, thatis, name, id and snapshot_time
return_list: list[dict[str, typing.Any]] = []
for snapshot in snapshots:
return_list.append(
{
'id': snapshot,
'name': self.VM.get_name_label(snapshot),
'snapshot_time': self.VM.get_snapshot_time(snapshot),
}
)
return sorted(return_list, key=lambda x: x['snapshot_time'], reverse=True)
return sorted([
self.get_vm_info(snapshot)
for snapshot in snapshots
], key=lambda x: x.snapshot_time, reverse=True)
except XenAPI.Failure as e:
raise XenFailure(e.details)
raise exceptions.XenFailure(e.details)
@cached(prefix='xen_folders', timeout=consts.cache.LONG_CACHE_TIMEOUT, key_helper=cache_key_helper)
def list_folders(self, **kwargs: typing.Any) -> list[str]:
@ -668,22 +545,16 @@ class XenServer: # pylint: disable=too-many-public-methods
A list of 'folders' (organizations, str) in the XenServer
"""
folders: set[str] = set('/') # Add root folder for machines without folder
for vm in self.list_machines():
other_config = self.VM.get_other_config(vm['id'])
folder: typing.Optional[str] = other_config.get('folder')
if folder:
folders.add(folder)
for vm in self.list_vms():
if vm.folder:
folders.add(vm.folder)
return sorted(folders)
def get_machines_from_folder(
self, folder: str, retrieve_names: bool = False
) -> list[dict[str, typing.Any]]:
result_list: list[dict[str, typing.Any]] = []
for vm in self.list_machines():
other_config = self.VM.get_other_config(vm['id'])
if other_config.get('folder', '/') == folder:
if retrieve_names:
vm['name'] = self.VM.get_name_label(vm['id'])
def list_vms_from_folder(self, folder: str) -> list[xen_types.VMInfo]:
result_list: list[xen_types.VMInfo] = []
for vm in self.list_vms():
if vm.folder == folder:
result_list.append(vm)
return result_list
@ -692,7 +563,7 @@ class XenServer: # pylint: disable=too-many-public-methods
operations = self.VM.get_allowed_operations(vmId)
logger.debug('Allowed operations: %s', operations)
if 'make_into_template' not in operations:
raise XenException('Convert in template is not supported for this machine')
raise exceptions.XenException('Convert in template is not supported for this machine')
self.VM.set_is_a_template(vmId, True)
# Apply that is an "UDS Template" taggint it
@ -710,13 +581,13 @@ class XenServer: # pylint: disable=too-many-public-methods
except Exception: # nosec: Can't set shadowMultiplier, nothing happens
pass
except XenAPI.Failure as e:
raise XenFailure(e.details)
raise exceptions.XenFailure(e.details)
def remove_template(self, templateId: str) -> None:
self.remove_machine(templateId)
def delete_template(self, template_opaque_ref: str) -> None:
self.delete_vm(template_opaque_ref)
def start_deploy_from_template(self, templateId: str, target_name: str) -> str:
def start_deploy_from_template(self, template_opaque_ref: str, target_name: str) -> str:
"""
After cloning template, we must deploy the VM so it's a full usable VM
"""
return self.clone_machine(templateId, target_name)
return self.clone_vm(template_opaque_ref, target_name)

View File

@ -0,0 +1,92 @@
# -*- coding: utf-8 -*-
#
# Copyright (c) 2014-2024 Virtual Cable S.L.U.
# All rights reserved.
#
# Redistribution and use in source and binary forms, with or without modification,
# are permitted provided that the following conditions are met:
#
# * Redistributions of source code must retain the above copyright notice,
# this list of conditions and the following disclaimer.
# * Redistributions in binary form must reproduce the above copyright notice,
# this list of conditions and the following disclaimer in the documentation
# and/or other materials provided with the distribution.
# * Neither the name of Virtual Cable S.L.U. nor the names of its contributors
# may be used to endorse or promote products derived from this software
# without specific prior written permission.
#
# THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS IS"
# AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE
# IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE ARE
# DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT HOLDER OR CONTRIBUTORS BE LIABLE
# FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR CONSEQUENTIAL
# DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR
# SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER
# CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY,
# OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE
# OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
"""
Author: Adolfo Gómez, dkmaster at dkmon dot com
"""
import logging
import typing
import XenAPI # pyright: ignore
from uds.core.services.generics import exceptions
logger = logging.getLogger(__name__)
class XenFault(exceptions.Error):
pass
class XenFailure(XenAPI.Failure, XenFault):
exBadVmPowerState = 'VM_BAD_POWER_STATE'
exVmMissingPVDrivers = 'VM_MISSING_PV_DRIVERS'
exHandleInvalid = 'HANDLE_INVALID'
exHostIsSlave = 'HOST_IS_SLAVE'
exSRError = 'SR_BACKEND_FAILURE_44'
def __init__(self, details: typing.Optional[list[typing.Any]] = None):
details = [] if details is None else details
super(XenFailure, self).__init__(details)
def isHandleInvalid(self) -> bool:
return typing.cast(typing.Any, self.details[0]) == XenFailure.exHandleInvalid
def needs_xen_tools(self) -> bool:
return typing.cast(typing.Any, self.details[0]) == XenFailure.exVmMissingPVDrivers
def bad_power_state(self) -> bool:
return typing.cast(typing.Any, self.details[0]) == XenFailure.exBadVmPowerState
def is_slave(self) -> bool:
return typing.cast(typing.Any, self.details[0]) == XenFailure.exHostIsSlave
def as_human_readable(self) -> str:
try:
error_list = {
XenFailure.exBadVmPowerState: 'Machine state is invalid for requested operation (needs {2} and state is {3})',
XenFailure.exVmMissingPVDrivers: 'Machine needs Xen Server Tools to allow requested operation',
XenFailure.exHostIsSlave: 'The connected host is an slave, try to connect to {1}',
XenFailure.exSRError: 'Error on SR: {2}',
XenFailure.exHandleInvalid: 'Invalid reference to {1}',
}
err = error_list.get(typing.cast(typing.Any, self.details[0]), 'Error {0}')
return err.format(*typing.cast(list[typing.Any], self.details))
except Exception:
return 'Unknown exception: {0}'.format(self.details)
def __str__(self) -> str:
return self.as_human_readable()
class XenException(XenFault):
def __init__(self, message: typing.Any):
XenFault.__init__(self, message)
logger.debug('Exception create: %s', message)

View File

@ -0,0 +1,490 @@
# -*- coding: utf-8 -*-
#
# Copyright (c) 2014-2024 Virtual Cable S.L.U.
# All rights reserved.
#
# Redistribution and use in source and binary forms, with or without modification,
# are permitted provided that the following conditions are met:
#
# * Redistributions of source code must retain the above copyright notice,
# this list of conditions and the following disclaimer.
# * Redistributions in binary form must reproduce the above copyright notice,
# this list of conditions and the following disclaimer in the documentation
# and/or other materials provided with the distribution.
# * Neither the name of Virtual Cable S.L.U. nor the names of its contributors
# may be used to endorse or promote products derived from this software
# without specific prior written permission.
#
# THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS IS"
# AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE
# IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE ARE
# DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT HOLDER OR CONTRIBUTORS BE LIABLE
# FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR CONSEQUENTIAL
# DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR
# SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER
# CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY,
# OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE
# OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
"""
Author: Adolfo Gómez, dkmaster at dkmon dot com
"""
import datetime
import enum
import dataclasses
import typing
class PowerState(enum.StrEnum):
HALTED = 'Halted'
RUNNING = 'Running'
SUSPENDED = 'Suspended'
PAUSED = 'Paused'
# Internal UNKNOW state
UNKNOW = 'Unknow'
def is_running(self):
return self == PowerState.RUNNING
def is_stopped(self):
return self in (PowerState.HALTED, PowerState.SUSPENDED)
def is_suspended(self):
return self == PowerState.SUSPENDED
@staticmethod
def from_str(value: str):
try:
return PowerState(value.capitalize())
except ValueError:
return PowerState.UNKNOW
class TaskStatus(enum.StrEnum):
"""
Values:
pending task is in progress
success task was completed successfully
failure task has failed
cancelling task is being cancelled
cancelled task has been cancelled
"""
PENDING = 'pending'
SUCCESS = 'success'
FAILURE = 'failure'
CANCELLING = 'cancelling'
CANCELLED = 'cancelled'
# Internal UNKNOW state
UNKNOW = 'unknow'
def is_done(self):
return self in (TaskStatus.SUCCESS, TaskStatus.FAILURE, TaskStatus.CANCELLED)
def is_success(self):
return self == TaskStatus.SUCCESS
def is_failure(self):
return self == TaskStatus.FAILURE
@staticmethod
def from_str(value: str):
try:
return TaskStatus(value.lower())
except ValueError:
return TaskStatus.UNKNOW
class StorageOperations(enum.StrEnum):
"""
Values:
scan Scanning backends for new or deleted VDIs
destroy Destroying the SR
forget Forgetting about SR
plug Plugging a PBD into this SR
unplug Unplugging a PBD from this SR
update Refresh the fields on the SR
vdi_create Creating a new VDI
vdi_introduce Introducing a new VDI
vdi_destroy Destroying a VDI
vdi_resize Resizing a VDI
vdi_clone Cloneing a VDI
vdi_snapshot Snapshotting a VDI
vdi_mirror Mirroring a VDI
vdi_enable_cbt Enabling changed block tracking for a VDI
vdi_disable_cbt Disabling changed block tracking for a VDI
vdi_data_destroy Deleting the data of the VDI
vdi_list_changed_blocks Exporting a bitmap that shows the changed blocks between two VDIs
vdi_set_on_boot Setting the on_boot field of the VDI
pbd_create Creating a PBD for this SR
pbd_destroy Destroying one of this SR's PBDs
"""
SCAN = 'scan'
DESTROY = 'destroy'
FORGET = 'forget'
PLUG = 'plug'
UNPLUG = 'unplug'
UPDATE = 'update'
VDI_CREATE = 'vdi_create'
VDI_INTRODUCE = 'vdi_introduce'
VDI_DESTROY = 'vdi_destroy'
VDI_RESIZE = 'vdi_resize'
VDI_CLONE = 'vdi_clone'
VDI_SNAPSHOT = 'vdi_snapshot'
VDI_MIRROR = 'vdi_mirror'
VDI_ENABLE_CBT = 'vdi_enable_cbt'
VDI_DISABLE_CBT = 'vdi_disable_cbt'
VDI_DATA_DESTROY = 'vdi_data_destroy'
VDI_LIST_CHANGED_BLOCKS = 'vdi_list_changed_blocks'
VDI_SET_ON_BOOT = 'vdi_set_on_boot'
PBD_CREATE = 'pbd_create'
PBD_DESTROY = 'pbd_destroy'
# Internal UNKNOW storage operation
UNKNOW = 'unknow'
@staticmethod
def from_str(value: str):
try:
return StorageOperations(value.lower())
except ValueError:
return StorageOperations.UNKNOW
@staticmethod
def is_usable(values: typing.Iterable['StorageOperations']) -> bool:
# To be usable, must contain ALL required operations
# Where the required operations are:
# VDI_CREATE, VDI_CLONE, VDI_SNAPSHOT, VDI_DESTROY
return all(
op in values
for op in (
StorageOperations.VDI_CREATE,
StorageOperations.VDI_CLONE,
StorageOperations.VDI_SNAPSHOT,
StorageOperations.VDI_DESTROY,
)
)
class VMOperations(enum.StrEnum):
"""
Values:
snapshot refers to the operation "snapshot"
clone refers to the operation "clone"
copy refers to the operation "copy"
create_template refers to the operation "create_template"
revert refers to the operation "revert"
checkpoint refers to the operation "checkpoint"
snapshot_with_quiesce refers to the operation "snapshot_with_quiesce"
provision refers to the operation "provision"
start refers to the operation "start"
start_on refers to the operation "start_on"
pause refers to the operation "pause"
unpause refers to the operation "unpause"
clean_shutdown refers to the operation "clean_shutdown"
clean_reboot refers to the operation "clean_reboot"
hard_shutdown refers to the operation "hard_shutdown"
power_state_reset refers to the operation "power_state_reset"
hard_reboot refers to the operation "hard_reboot"
suspend refers to the operation "suspend"
csvm refers to the operation "csvm"
resume refers to the operation "resume"
resume_on refers to the operation "resume_on"
pool_migrate refers to the operation "pool_migrate"
migrate_send refers to the operation "migrate_send"
get_boot_record refers to the operation "get_boot_record"
send_sysrq refers to the operation "send_sysrq"
send_trigger refers to the operation "send_trigger"
query_services refers to the operation "query_services"
shutdown refers to the operation "shutdown"
call_plugin refers to the operation "call_plugin"
changing_memory_live Changing the memory settings
awaiting_memory_live Waiting for the memory settings to change
changing_dynamic_range Changing the memory dynamic range
changing_static_range Changing the memory static range
changing_memory_limits Changing the memory limits
changing_shadow_memory Changing the shadow memory for a halted VM.
changing_shadow_memory_live Changing the shadow memory for a running VM.
changing_VCPUs Changing VCPU settings for a halted VM.
changing_VCPUs_live Changing VCPU settings for a running VM.
changing_NVRAM Changing NVRAM for a halted VM.
assert_operation_valid
data_source_op Add, remove, query or list data sources
update_allowed_operations
make_into_template Turning this VM into a template
import importing a VM from a network stream
export exporting a VM to a network stream
metadata_export exporting VM metadata to a network stream
reverting Reverting the VM to a previous snapshotted state
destroy refers to the act of uninstalling the VM
create_vtpm Creating and adding a VTPM to this VM
"""
SNAPSHOOT = 'snapshot'
CLONE = 'clone'
COPY = 'copy'
CREATE_TEMPLATE = 'create_template'
REVERT = 'revert'
CHECKPOINT = 'checkpoint'
SNAPSHOT_WITH_QUIESCE = 'snapshot_with_quiesce'
PROVISION = 'provision'
START = 'start'
START_ON = 'start_on'
PAUSE = 'pause'
UNPAUSE = 'unpause'
CLEAN_SHUTDOWN = 'clean_shutdown'
CLEAN_REBOOT = 'clean_reboot'
HARD_SHUTDOWN = 'hard_shutdown'
POWER_STATE_RESET = 'power_state_reset'
HARD_REBOOT = 'hard_reboot'
SUSPEND = 'suspend'
CSVM = 'csvm'
RESUME = 'resume'
RESUME_ON = 'resume_on'
POOL_MIGRATE = 'pool_migrate'
MIGRATE_SEND = 'migrate_send'
GET_BOOT_RECORD = 'get_boot_record'
SEND_SYSRQ = 'send_sysrq'
SEND_TRIGGER = 'send_trigger'
QUERY_SERVICES = 'query_services'
SHUTDOWN = 'shutdown'
CALL_PLUGIN = 'call_plugin'
CHANGING_MEMORY_LIVE = 'changing_memory_live'
AWAITING_MEMORY_LIVE = 'awaiting_memory_live'
CHANGING_DYNAMIC_RANGE = 'changing_dynamic_range'
CHANGING_STATIC_RANGE = 'changing_static_range'
CHANGING_MEMORY_LIMITS = 'changing_memory_limits'
CHANGING_SHADOW_MEMORY = 'changing_shadow_memory'
CHANGING_SHADOW_MEMORY_LIVE = 'changing_shadow_memory_live'
CHANGING_VCPUS = 'changing_VCPUs'
CHANGING_VCPUS_LIVE = 'changing_VCPUs_live'
CHANGING_NVRAM = 'changing_NVRAM'
ASSERT_OPERATION_VALID = 'assert_operation_valid'
DATA_SOURCE_OP = 'data_source_op'
UPDATE_ALLOWED_OPERATIONS = 'update_allowed_operations'
MAKE_INTO_TEMPLATE = 'make_into_template'
IMPORT = 'import'
EXPORT = 'export'
METADATA_EXPORT = 'metadata_export'
REVERTING = 'reverting'
DESTROY = 'destroy'
CREATE_VTPM = 'create_vtpm'
# Internal UNKNOW VM operation
UNKNOW = 'unknow'
@staticmethod
def from_str(value: str):
try:
return VMOperations(value.lower())
except ValueError:
return VMOperations.UNKNOW
@dataclasses.dataclass
class StorageInfo:
opaque_ref: str
uuid: str
name: str
description: str
allowed_operations: typing.List[StorageOperations]
# current_operations not used
VDIs: list[str] # List of VDIs UUIDs
PBDs: list[str] # List of PDBs UUIDs
virtual_allocation: int # Virtual size of the storage
physical_utilisation: int # Used size of the storage
physical_size: int # Total size of the storage
type: str # Type of the storage
content_type: str # Content type of the storage
shared: bool # Shared storage
@staticmethod
def from_dict(data: dict[str, typing.Any], opaque_ref: str) -> 'StorageInfo':
return StorageInfo(
opaque_ref=opaque_ref,
uuid=data['uuid'],
name=data['name_label'],
description=data['name_description'],
allowed_operations=[StorageOperations.from_str(op) for op in data['allowed_operations']],
VDIs=typing.cast(list[str], data.get('VDIs', '')),
PBDs=typing.cast(list[str], data.get('PBDs', '')),
virtual_allocation=int(data['virtual_allocation']),
physical_utilisation=int(data['physical_utilisation']),
physical_size=int(data['physical_size']),
type=data['type'],
content_type=data['content_type'],
shared=data['shared'],
)
def is_usable(self) -> bool:
if self.type == 'iso' or self.shared is False or self.name == '':
return False
return StorageOperations.is_usable(self.allowed_operations)
@dataclasses.dataclass
class VMInfo:
opaque_ref: str
uuid: str
name: str
description: str
power_state: PowerState
is_control_domain: bool
is_a_template: bool
snapshot_time: datetime.datetime
snapshots: list[str]
allowed_operations: typing.List[VMOperations]
# Other useful configuration
folder: str
@staticmethod
def from_dict(data: dict[str, typing.Any], opaque_ref: str) -> 'VMInfo':
try:
snapshot_time = datetime.datetime.fromisoformat(data['snapshot_time'].value)
except ValueError:
snapshot_time = datetime.datetime.now()
other_config = typing.cast(dict[str, str], data.get('other_config', {}))
return VMInfo(
opaque_ref=opaque_ref,
uuid=data['uuid'],
name=data['name_label'],
description=data['name_description'],
power_state=PowerState.from_str(data['power_state']),
is_control_domain=data['is_control_domain'],
is_a_template=data['is_a_template'],
snapshot_time=snapshot_time,
snapshots=typing.cast(list[str], data.get('snapshots', [])),
allowed_operations=[VMOperations.from_str(op) for op in data['allowed_operations']],
folder=other_config.get('folder', ''),
)
@staticmethod
def empty(opaque_ref: str) -> 'VMInfo':
return VMInfo(
opaque_ref=opaque_ref,
uuid='',
name='Unknown',
description='Unknown VM',
power_state=PowerState.UNKNOW,
is_control_domain=False,
is_a_template=False,
snapshot_time=datetime.datetime.now(),
snapshots=[],
allowed_operations=[],
folder='',
)
def is_usable(self) -> bool:
if self.is_control_domain or self.is_a_template:
return False
return True
def supports_suspend(self) -> bool:
return VMOperations.SUSPEND in self.allowed_operations
@dataclasses.dataclass
class NetworkInfo:
opaque_ref: str
uuid: str
name: str
description: str
managed: bool
VIFs: list[str] # List of VIFs opaques
PIFs: list[str] # List of PIFs opaques
# Other useful configuration
is_guest_installer_network: bool
is_host_internal_management_network: bool
ip_begin: str
ip_end: str
netmask: str
@staticmethod
def from_dict(data: dict[str, typing.Any], opaque_ref: str) -> 'NetworkInfo':
other_config = typing.cast(dict[str, typing.Any], data.get('other_config', {}))
return NetworkInfo(
opaque_ref=opaque_ref,
uuid=data['uuid'],
name=data['name_label'],
description=data['name_description'],
managed=data['managed'],
VIFs=typing.cast(list[str], data.get('VIFs', [])),
PIFs=typing.cast(list[str], data.get('PIFs', [])),
is_guest_installer_network=other_config.get('is_guest_installer_network', False),
is_host_internal_management_network=other_config.get('is_host_internal_management_network', False),
ip_begin=other_config.get('ip_begin', ''),
ip_end=other_config.get('ip_end', ''),
netmask=other_config.get('netmask', ''),
)
@dataclasses.dataclass
class TaskInfo:
opaque_ref: str
uuid: str
name: str
description: str
created: datetime.datetime
finished: datetime.datetime
status: TaskStatus
result: str
progress: float
@staticmethod
def from_dict(data: dict[str, typing.Any], opaque_ref: str) -> 'TaskInfo':
result = data.get('result', '')
if result and result.startswith('<value>'):
result = result[7:-8]
try:
created = datetime.datetime.fromisoformat(data['created'].value)
except ValueError:
created = datetime.datetime.now()
try:
finished = datetime.datetime.fromisoformat(data['finished'].value)
except ValueError:
finished = created
return TaskInfo(
opaque_ref=opaque_ref,
uuid=data['uuid'],
name=data['name_label'],
description=data['name_description'],
created=created,
finished=finished,
status=TaskStatus.from_str(data['status']),
result=result,
progress=float(data['progress']),
)
@staticmethod
def unknown_task(opaque_ref: str) -> 'TaskInfo':
return TaskInfo(
opaque_ref=opaque_ref,
uuid='',
name='Unknown',
description='Unknown task',
created=datetime.datetime.now(),
finished=datetime.datetime.now(),
status=TaskStatus.UNKNOW,
result='',
progress=0.0,
)
def is_done(self) -> bool:
return self.status.is_done()
def is_success(self) -> bool:
return self.status.is_success()
def is_failure(self) -> bool:
return self.status.is_failure()