mirror of
https://github.com/dkmstr/openuds.git
synced 2025-01-22 22:03:54 +03:00
Improving A LOT the ovirt connection
Adding test for ovirt
This commit is contained in:
parent
760bb169bf
commit
cbacc20909
@ -41,6 +41,7 @@ from uds.core.managers.userservice import UserServiceManager
|
||||
from uds.core.util import autoserializable, log
|
||||
|
||||
from .jobs import OVirtDeferredRemoval
|
||||
from .ovirt import types as ov_types
|
||||
|
||||
# Not imported at runtime, just for type checking
|
||||
if typing.TYPE_CHECKING:
|
||||
@ -78,10 +79,15 @@ class Operation(enum.IntEnum):
|
||||
return Operation.opUnknown
|
||||
|
||||
|
||||
UP_STATES: typing.Final[set[str]] = {'up', 'reboot_in_progress', 'powering_up', 'restoring_state'}
|
||||
UP_STATES: typing.Final[set[ov_types.VMStatus]] = {
|
||||
ov_types.VMStatus.UP,
|
||||
ov_types.VMStatus.REBOOT_IN_PROGRESS,
|
||||
ov_types.VMStatus.POWERING_UP,
|
||||
ov_types.VMStatus.RESTORING_STATE,
|
||||
}
|
||||
|
||||
|
||||
class OVirtLinkedDeployment(services.UserService, autoserializable.AutoSerializable):
|
||||
class OVirtLinkedUserService(services.UserService, autoserializable.AutoSerializable):
|
||||
"""
|
||||
This class generates the user consumable elements of the service tree.
|
||||
|
||||
@ -201,12 +207,12 @@ class OVirtLinkedDeployment(services.UserService, autoserializable.AutoSerializa
|
||||
return types.states.TaskState.FINISHED
|
||||
|
||||
try:
|
||||
state = self.service().get_machine_state(self._vmid)
|
||||
vminfo = self.service().provider().api.get_machine_info(self._vmid)
|
||||
|
||||
if state == 'unknown':
|
||||
return self._error('Machine is not available anymore')
|
||||
if vminfo.status == ov_types.VMStatus.UNKNOWN:
|
||||
return self._error('Machine not found')
|
||||
|
||||
if state not in UP_STATES:
|
||||
if vminfo.status not in UP_STATES:
|
||||
self._queue = [Operation.START, Operation.FINISH]
|
||||
return self._execute_queue()
|
||||
|
||||
@ -222,7 +228,7 @@ class OVirtLinkedDeployment(services.UserService, autoserializable.AutoSerializa
|
||||
o oVirt, reset operation just shutdowns it until v3 support is removed
|
||||
"""
|
||||
if self._vmid != '':
|
||||
self.service().stop_machine(self._vmid)
|
||||
self.service().provider().api.stop_machine(self._vmid)
|
||||
|
||||
def get_console_connection(
|
||||
self,
|
||||
@ -284,28 +290,25 @@ if sys.platform == 'win32':
|
||||
Operation.FINISH,
|
||||
]
|
||||
|
||||
def _check_machine_state(self, check_state: collections.abc.Iterable[str]) -> types.states.TaskState:
|
||||
def _check_machine_state(
|
||||
self, check_state: collections.abc.Iterable[ov_types.VMStatus]
|
||||
) -> types.states.TaskState:
|
||||
logger.debug(
|
||||
'Checking that state of machine %s (%s) is %s',
|
||||
self._vmid,
|
||||
self._name,
|
||||
check_state,
|
||||
)
|
||||
state = self.service().get_machine_state(self._vmid)
|
||||
|
||||
# If we want to check an state and machine does not exists (except in case that we whant to check this)
|
||||
if state == 'unknown' and check_state != 'unknown':
|
||||
vm_info = self.service().provider().api.get_machine_info(self._vmid)
|
||||
if vm_info.status == ov_types.VMStatus.UNKNOWN:
|
||||
return self._error('Machine not found')
|
||||
|
||||
ret = types.states.TaskState.RUNNING
|
||||
if isinstance(check_state, (list, tuple)):
|
||||
# if iterable...
|
||||
for cks in check_state:
|
||||
if state == cks:
|
||||
if vm_info.status == cks:
|
||||
ret = types.states.TaskState.FINISHED
|
||||
break
|
||||
else:
|
||||
if state == check_state:
|
||||
ret = types.states.TaskState.FINISHED
|
||||
|
||||
return ret
|
||||
|
||||
@ -408,9 +411,7 @@ if sys.platform == 'win32':
|
||||
) # oVirt don't let us to create machines with more than 15 chars!!!
|
||||
comments = 'UDS Linked clone'
|
||||
|
||||
self._vmid = self.service().deploy_from_template(name, comments, template_id)
|
||||
if not self._vmid:
|
||||
raise Exception('Can\'t create machine')
|
||||
self._vmid = self.service().deploy_from_template(name, comments, template_id).id
|
||||
|
||||
return types.states.TaskState.RUNNING
|
||||
|
||||
@ -418,16 +419,15 @@ if sys.platform == 'win32':
|
||||
"""
|
||||
Removes a machine from system
|
||||
"""
|
||||
state = self.service().get_machine_state(self._vmid)
|
||||
vminfo = self.service().provider().api.get_machine_info(self._vmid)
|
||||
|
||||
if state == 'unknown':
|
||||
if vminfo.status == ov_types.VMStatus.UNKNOWN:
|
||||
raise Exception('Machine not found')
|
||||
|
||||
if state != 'down':
|
||||
self._push_front_op(Operation.STOP)
|
||||
self._execute_queue()
|
||||
if vminfo.status != ov_types.VMStatus.DOWN:
|
||||
self._push_front_op(Operation.RETRY)
|
||||
else:
|
||||
self.service().remove_machine(self._vmid)
|
||||
self.service().provider().api.remove_machine(self._vmid)
|
||||
|
||||
return types.states.TaskState.RUNNING
|
||||
|
||||
@ -435,19 +435,15 @@ if sys.platform == 'win32':
|
||||
"""
|
||||
Powers on the machine
|
||||
"""
|
||||
state = self.service().get_machine_state(self._vmid)
|
||||
vminfo = self.service().provider().api.get_machine_info(self._vmid)
|
||||
|
||||
if state == 'unknown':
|
||||
if vminfo.status == ov_types.VMStatus.UNKNOWN:
|
||||
raise Exception('Machine not found')
|
||||
|
||||
if state in UP_STATES: # Already started, return
|
||||
return types.states.TaskState.RUNNING
|
||||
|
||||
if state not in ('down', 'suspended'):
|
||||
self._push_front_op(
|
||||
Operation.RETRY
|
||||
) # Will call "check Retry", that will finish inmediatly and again call this one
|
||||
self.service().start_machine(self._vmid)
|
||||
if vminfo.status not in UP_STATES:
|
||||
self._push_front_op(Operation.RETRY)
|
||||
else:
|
||||
self.service().provider().api.start_machine(self._vmid)
|
||||
|
||||
return types.states.TaskState.RUNNING
|
||||
|
||||
@ -455,20 +451,18 @@ if sys.platform == 'win32':
|
||||
"""
|
||||
Powers off the machine
|
||||
"""
|
||||
state = self.service().get_machine_state(self._vmid)
|
||||
vminfo = self.service().provider().api.get_machine_info(self._vmid)
|
||||
|
||||
if state == 'unknown':
|
||||
if vminfo.status == ov_types.VMStatus.UNKNOWN:
|
||||
raise Exception('Machine not found')
|
||||
|
||||
if state == 'down': # Already stoped, return
|
||||
if vminfo.status == ov_types.VMStatus.DOWN: # Already stoped, return
|
||||
return types.states.TaskState.RUNNING
|
||||
|
||||
if state not in ('up', 'suspended'):
|
||||
self._push_front_op(
|
||||
Operation.RETRY
|
||||
) # Will call "check Retry", that will finish inmediatly and again call this one
|
||||
if vminfo.status not in UP_STATES:
|
||||
self._push_front_op(Operation.RETRY)
|
||||
else:
|
||||
self.service().stop_machine(self._vmid)
|
||||
self.service().provider().api.stop_machine(self._vmid)
|
||||
|
||||
return types.states.TaskState.RUNNING
|
||||
|
||||
@ -476,20 +470,20 @@ if sys.platform == 'win32':
|
||||
"""
|
||||
Suspends the machine
|
||||
"""
|
||||
state = self.service().get_machine_state(self._vmid)
|
||||
vminfo = self.service().provider().api.get_machine_info(self._vmid)
|
||||
|
||||
if state == 'unknown':
|
||||
if vminfo.status == ov_types.VMStatus.UNKNOWN:
|
||||
raise Exception('Machine not found')
|
||||
|
||||
if state == 'suspended': # Already suspended, return
|
||||
if vminfo.status == ov_types.VMStatus.SUSPENDED: # Already suspended, return
|
||||
return types.states.TaskState.RUNNING
|
||||
|
||||
if state != 'up':
|
||||
if vminfo.status not in UP_STATES:
|
||||
self._push_front_op(
|
||||
Operation.RETRY
|
||||
) # Remember here, the return types.states.DeployState.FINISH will make this retry be "poped" right ar return
|
||||
else:
|
||||
self.service().suspend_machine(self._vmid)
|
||||
self.service().provider().api.suspend_machine(self._vmid)
|
||||
|
||||
return types.states.TaskState.RUNNING
|
||||
|
||||
@ -497,7 +491,7 @@ if sys.platform == 'win32':
|
||||
"""
|
||||
Changes the mac of the first nic
|
||||
"""
|
||||
self.service().update_machine_mac(self._vmid, self.get_unique_id())
|
||||
self.service().provider().api.update_machine_mac(self._vmid, self.get_unique_id())
|
||||
# Fix usb if needed
|
||||
self.service().fix_usb(self._vmid)
|
||||
|
||||
@ -508,7 +502,7 @@ if sys.platform == 'win32':
|
||||
"""
|
||||
Checks the state of a deploy for an user or cache
|
||||
"""
|
||||
return self._check_machine_state('down')
|
||||
return self._check_machine_state([ov_types.VMStatus.DOWN])
|
||||
|
||||
def _start_checker(self) -> types.states.TaskState:
|
||||
"""
|
||||
@ -520,19 +514,21 @@ if sys.platform == 'win32':
|
||||
"""
|
||||
Checks if machine has stoped
|
||||
"""
|
||||
return self._check_machine_state('down')
|
||||
return self._check_machine_state([ov_types.VMStatus.DOWN])
|
||||
|
||||
def _suspend_checker(self) -> types.states.TaskState:
|
||||
"""
|
||||
Check if the machine has suspended
|
||||
"""
|
||||
return self._check_machine_state('suspended')
|
||||
return self._check_machine_state(
|
||||
[ov_types.VMStatus.SUSPENDED, ov_types.VMStatus.DOWN]
|
||||
) # Down is also valid for us
|
||||
|
||||
def _remove_checker(self) -> types.states.TaskState:
|
||||
"""
|
||||
Checks if a machine has been removed
|
||||
"""
|
||||
return self._check_machine_state('unknown')
|
||||
return self._check_machine_state([ov_types.VMStatus.UNKNOWN])
|
||||
|
||||
def _mac_checker(self) -> types.states.TaskState:
|
||||
"""
|
||||
@ -567,7 +563,9 @@ if sys.platform == 'win32':
|
||||
}
|
||||
|
||||
try:
|
||||
operation_checker: typing.Optional[typing.Optional[collections.abc.Callable[[], types.states.TaskState]]] = fncs.get(op, None)
|
||||
operation_checker: typing.Optional[
|
||||
typing.Optional[collections.abc.Callable[[], types.states.TaskState]]
|
||||
] = fncs.get(op, None)
|
||||
|
||||
if operation_checker is None:
|
||||
return self._error(f'Unknown operation found at check queue ({op})')
|
||||
@ -665,5 +663,5 @@ if sys.platform == 'win32':
|
||||
self._ip,
|
||||
self._mac,
|
||||
self._vmid,
|
||||
[OVirtLinkedDeployment._op2str(op) for op in self._queue],
|
||||
[OVirtLinkedUserService._op2str(op) for op in self._queue],
|
||||
)
|
||||
|
@ -66,9 +66,9 @@ class OVirtDeferredRemoval(jobs.Job):
|
||||
try:
|
||||
# First check state & stop machine if needed
|
||||
state = instance.api.get_machine_state(vmid)
|
||||
if state in (ov_types.VmStatus.UP, ov_types.VmStatus.POWERING_UP, ov_types.VmStatus.SUSPENDED):
|
||||
if state in (ov_types.VMStatus.UP, ov_types.VMStatus.POWERING_UP, ov_types.VMStatus.SUSPENDED):
|
||||
instance.api.stop_machine(vmid)
|
||||
elif state != ov_types.VmStatus.UNKNOWN: # Machine exists, remove it later
|
||||
elif state != ov_types.VMStatus.UNKNOWN: # Machine exists, remove it later
|
||||
instance.storage.save_to_db('tr' + vmid, vmid, attr1='tRm')
|
||||
|
||||
except Exception as e:
|
||||
@ -102,14 +102,14 @@ class OVirtDeferredRemoval(jobs.Job):
|
||||
# tries to remove in sync mode
|
||||
state = instance.api.get_machine_state(vmid)
|
||||
if state in (
|
||||
ov_types.VmStatus.UP,
|
||||
ov_types.VmStatus.POWERING_UP,
|
||||
ov_types.VmStatus.SUSPENDED,
|
||||
ov_types.VMStatus.UP,
|
||||
ov_types.VMStatus.POWERING_UP,
|
||||
ov_types.VMStatus.SUSPENDED,
|
||||
):
|
||||
instance.api.stop_machine(vmid)
|
||||
return
|
||||
|
||||
if state != ov_types.VmStatus.UNKNOWN: # Machine exists, try to remove it now
|
||||
if state != ov_types.VMStatus.UNKNOWN: # Machine exists, try to remove it now
|
||||
instance.api.remove_machine(vmid)
|
||||
|
||||
# It this is reached, remove check
|
||||
|
@ -35,6 +35,7 @@ import threading
|
||||
import logging
|
||||
import typing
|
||||
import collections.abc
|
||||
import ssl # for getting server certificate
|
||||
|
||||
import ovirtsdk4
|
||||
import ovirtsdk4.types
|
||||
@ -162,11 +163,14 @@ class Client:
|
||||
]
|
||||
|
||||
@decorators.cached(prefix='o-vm', timeout=3, key_helper=_key_helper)
|
||||
def get_machine(self, machine_id: str, **kwargs: typing.Any) -> ov_types.VMInfo:
|
||||
def get_machine_info(self, machine_id: str, **kwargs: typing.Any) -> ov_types.VMInfo:
|
||||
with _access_lock():
|
||||
try:
|
||||
return ov_types.VMInfo.from_data(
|
||||
typing.cast(typing.Any, self.api.system_service().vms_service().service(machine_id).get())
|
||||
)
|
||||
except Exception:
|
||||
return ov_types.VMInfo.missing()
|
||||
|
||||
@decorators.cached(prefix='o-clusters', timeout=consts.cache.LONG_CACHE_TIMEOUT, key_helper=_key_helper)
|
||||
def list_clusters(self, **kwargs: typing.Any) -> list[ov_types.ClusterInfo]:
|
||||
@ -258,7 +262,7 @@ class Client:
|
||||
cluster_id: str,
|
||||
storage_id: str,
|
||||
display_type: str,
|
||||
) -> str:
|
||||
) -> ov_types.TemplateInfo:
|
||||
"""
|
||||
Publish the machine (makes a template from it so we can create COWs) and returns the template id of
|
||||
the creating machine
|
||||
@ -316,36 +320,27 @@ class Client:
|
||||
template = ovirtsdk4.types.Template(name=name, vm=tvm, cluster=tcluster, description=comments)
|
||||
|
||||
# display=display)
|
||||
return ov_types.TemplateInfo.from_data(
|
||||
typing.cast(
|
||||
ovirtsdk4.types.Template,
|
||||
self.api.system_service().templates_service().add(template),
|
||||
)
|
||||
)
|
||||
|
||||
return typing.cast(str, self.api.system_service().templates_service().add(template).id)
|
||||
|
||||
def get_template_state(self, template_id: str) -> str:
|
||||
def get_template_info(self, template_id: str) -> ov_types.TemplateInfo:
|
||||
"""
|
||||
Returns current template state.
|
||||
This method do not uses cache at all (it always tries to get template state from oVirt server)
|
||||
|
||||
Returned values could be:
|
||||
ok
|
||||
locked
|
||||
removed
|
||||
|
||||
(don't know if ovirt returns something more right now, will test what happens when template can't be published)
|
||||
Returns the template info for the given template id
|
||||
"""
|
||||
with _access_lock():
|
||||
try:
|
||||
# we cast constantly to typing.Any because pyright does not recognize the types
|
||||
# But anotate the "real" type on the cast
|
||||
template: typing.Any = typing.cast(
|
||||
return ov_types.TemplateInfo.from_data(
|
||||
typing.cast(
|
||||
ovirtsdk4.types.Template,
|
||||
self.api.system_service().templates_service().service(template_id).get(),
|
||||
)
|
||||
|
||||
if not template:
|
||||
return 'removed'
|
||||
|
||||
return template.status.value
|
||||
)
|
||||
except Exception: # Not found
|
||||
return 'removed'
|
||||
return ov_types.TemplateInfo.missing()
|
||||
|
||||
def deploy_from_template(
|
||||
self,
|
||||
@ -357,7 +352,7 @@ class Client:
|
||||
usb_type: str,
|
||||
memory_mb: int,
|
||||
guaranteed_mb: int,
|
||||
) -> str:
|
||||
) -> ov_types.VMInfo:
|
||||
"""
|
||||
Deploys a virtual machine on selected cluster from selected template
|
||||
|
||||
@ -404,41 +399,20 @@ class Client:
|
||||
usb=usb,
|
||||
) # display=display,
|
||||
|
||||
return typing.cast(str, self.api.system_service().vms_service().add(par).id)
|
||||
return ov_types.VMInfo.from_data(
|
||||
self.api.system_service().vms_service().add(par)
|
||||
)
|
||||
|
||||
def remove_template(self, templateId: str) -> None:
|
||||
def remove_template(self, template_id: str) -> None:
|
||||
"""
|
||||
Removes a template from ovirt server
|
||||
|
||||
Returns nothing, and raises an Exception if it fails
|
||||
"""
|
||||
with _access_lock():
|
||||
self.api.system_service().templates_service().service(templateId).remove()
|
||||
self.api.system_service().templates_service().service(template_id).remove()
|
||||
# This returns nothing, if it fails it raises an exception
|
||||
|
||||
def get_machine_state(self, machine_id: str) -> ov_types.VmStatus:
|
||||
"""
|
||||
Returns current state of a machine (running, suspended, ...).
|
||||
This method do not uses cache at all (it always tries to get machine state from oVirt server)
|
||||
|
||||
Args:
|
||||
machineId: Id of the machine to get status
|
||||
|
||||
Returns:
|
||||
one of this values:
|
||||
unassigned, down, up, powering_up, powered_down,
|
||||
paused, migrating_from, migrating_to, unknown, not_responding,
|
||||
wait_for_launch, reboot_in_progress, saving_state, restoring_state,
|
||||
suspended, image_illegal, image_locked or powering_down
|
||||
Also can return'unknown' if Machine is not known
|
||||
"""
|
||||
with _access_lock():
|
||||
try:
|
||||
vm_info = self.get_machine(machine_id)
|
||||
return vm_info.status
|
||||
except Exception: # machine not found
|
||||
return ov_types.VmStatus.UNKNOWN
|
||||
|
||||
def start_machine(self, machine_id: str) -> None:
|
||||
"""
|
||||
Tries to start a machine. No check is done, it is simply requested to oVirt.
|
||||
@ -535,7 +509,7 @@ class Client:
|
||||
vmu = ovirtsdk4.types.Vm(usb=usb)
|
||||
vms.update(vmu)
|
||||
|
||||
def get_console_connection(self, machine_id: str) -> typing.Optional[types.services.ConsoleConnectionInfo]:
|
||||
def get_console_connection_info(self, machine_id: str) -> typing.Optional[types.services.ConsoleConnectionInfo]:
|
||||
"""
|
||||
Gets the connetion info for the specified machine
|
||||
"""
|
||||
@ -550,10 +524,12 @@ class Client:
|
||||
display = vm.display
|
||||
ticket = vm_service.ticket()
|
||||
|
||||
ca: str = '' # Not known ca
|
||||
# Get host subject
|
||||
cert_subject = ''
|
||||
if display.certificate is not None:
|
||||
cert_subject = display.certificate.subject
|
||||
ca = display.certificate.content
|
||||
else:
|
||||
for i in typing.cast(
|
||||
collections.abc.Iterable[typing.Any], self.api.system_service().hosts_service().list()
|
||||
@ -572,6 +548,12 @@ class Client:
|
||||
# If found
|
||||
if cert_subject != '':
|
||||
break
|
||||
# Try to get certificate from host
|
||||
# Note: This will only work if the certificate is self-signed
|
||||
try:
|
||||
ca = ssl.get_server_certificate((display.address, display.secure_port))
|
||||
except Exception:
|
||||
ca = ''
|
||||
|
||||
return types.services.ConsoleConnectionInfo(
|
||||
type=display.type.value,
|
||||
@ -579,6 +561,7 @@ class Client:
|
||||
port=display.port,
|
||||
secure_port=display.secure_port,
|
||||
cert_subject=cert_subject,
|
||||
ca=ca,
|
||||
ticket=types.services.ConsoleConnectionTicket(value=ticket.value),
|
||||
)
|
||||
|
||||
|
@ -3,7 +3,7 @@ import typing
|
||||
import dataclasses
|
||||
|
||||
|
||||
class VmStatus(enum.StrEnum):
|
||||
class VMStatus(enum.StrEnum):
|
||||
# Adapted from ovirtsdk4
|
||||
DOWN = 'down'
|
||||
IMAGE_LOCKED = 'image_locked'
|
||||
@ -22,11 +22,11 @@ class VmStatus(enum.StrEnum):
|
||||
WAIT_FOR_LAUNCH = 'wait_for_launch'
|
||||
|
||||
@staticmethod
|
||||
def from_str(status: str) -> 'VmStatus':
|
||||
def from_str(status: str) -> 'VMStatus':
|
||||
try:
|
||||
return VmStatus(status)
|
||||
return VMStatus(status)
|
||||
except ValueError:
|
||||
return VmStatus.UNKNOWN
|
||||
return VMStatus.UNKNOWN
|
||||
|
||||
|
||||
class StorageStatus(enum.StrEnum):
|
||||
@ -49,6 +49,7 @@ class StorageStatus(enum.StrEnum):
|
||||
except ValueError:
|
||||
return StorageStatus.UNKNOWN
|
||||
|
||||
|
||||
class StorageType(enum.StrEnum):
|
||||
# Adapted from ovirtsdk4
|
||||
DATA = 'data'
|
||||
@ -69,6 +70,24 @@ class StorageType(enum.StrEnum):
|
||||
return StorageType.UNKNOWN
|
||||
|
||||
|
||||
class TemplateStatus(enum.StrEnum):
|
||||
# Adapted from ovirtsdk4
|
||||
ILLEGAL = 'illegal'
|
||||
LOCKED = 'locked'
|
||||
OK = 'ok'
|
||||
|
||||
# Custom value to represent the template is missing
|
||||
# Used on get_template_info
|
||||
UNKNOWN = 'unknown'
|
||||
|
||||
@staticmethod
|
||||
def from_str(status: str) -> 'TemplateStatus':
|
||||
try:
|
||||
return TemplateStatus(status)
|
||||
except ValueError:
|
||||
return TemplateStatus.ILLEGAL
|
||||
|
||||
|
||||
@dataclasses.dataclass
|
||||
class StorageInfo:
|
||||
id: str
|
||||
@ -98,7 +117,7 @@ class StorageInfo:
|
||||
class DatacenterInfo:
|
||||
name: str
|
||||
id: str
|
||||
storage_type: str
|
||||
local_storage: bool
|
||||
description: str
|
||||
storage: list[StorageInfo]
|
||||
|
||||
@ -107,7 +126,7 @@ class DatacenterInfo:
|
||||
return DatacenterInfo(
|
||||
name=datacenter.name,
|
||||
id=datacenter.id,
|
||||
storage_type=datacenter.local and 'local' or 'shared',
|
||||
local_storage=datacenter.local,
|
||||
description=datacenter.description,
|
||||
storage=storage,
|
||||
)
|
||||
@ -136,7 +155,7 @@ class VMInfo:
|
||||
usb_enabled: bool
|
||||
# usb legacy is not supported anymore, so we only have "native"
|
||||
# and does not needs a separate field
|
||||
status: VmStatus
|
||||
status: VMStatus
|
||||
|
||||
@staticmethod
|
||||
def from_data(vm: typing.Any) -> 'VMInfo':
|
||||
@ -149,5 +168,32 @@ class VMInfo:
|
||||
id=vm.id,
|
||||
cluster_id=vm.cluster.id,
|
||||
usb_enabled=usb_enabled,
|
||||
status=VmStatus.from_str(vm.status.value),
|
||||
status=VMStatus.from_str(vm.status.value),
|
||||
)
|
||||
|
||||
@staticmethod
|
||||
def missing() -> 'VMInfo':
|
||||
return VMInfo(name='', id='', cluster_id='', usb_enabled=False, status=VMStatus.UNKNOWN)
|
||||
|
||||
|
||||
@dataclasses.dataclass
|
||||
class TemplateInfo:
|
||||
id: str
|
||||
name: str
|
||||
description: str
|
||||
cluster_id: str
|
||||
status: TemplateStatus
|
||||
|
||||
@staticmethod
|
||||
def from_data(template: typing.Any) -> 'TemplateInfo':
|
||||
return TemplateInfo(
|
||||
id=template.id,
|
||||
name=template.name,
|
||||
description=template.description,
|
||||
cluster_id=template.cluster.id,
|
||||
status=TemplateStatus.from_str(template.status.value),
|
||||
)
|
||||
|
||||
@staticmethod
|
||||
def missing() -> 'TemplateInfo':
|
||||
return TemplateInfo(id='', name='', description='', cluster_id='', status=TemplateStatus.UNKNOWN)
|
||||
|
@ -40,6 +40,8 @@ from uds.core.services import Publication
|
||||
from uds.core import types
|
||||
from uds.core.util import autoserializable
|
||||
|
||||
from .ovirt import types as ov_types
|
||||
|
||||
# Not imported at runtime, just for type checking
|
||||
if typing.TYPE_CHECKING:
|
||||
from .service import OVirtLinkedService
|
||||
@ -101,7 +103,7 @@ class OVirtPublication(Publication, autoserializable.AutoSerializable):
|
||||
self._state = 'locked'
|
||||
|
||||
try:
|
||||
self._template_id = self.service().make_template(self._name, comments)
|
||||
self._template_id = self.service().make_template(self._name, comments).id
|
||||
except Exception as e:
|
||||
self._state = 'error'
|
||||
self._reason = str(e)
|
||||
@ -120,21 +122,23 @@ class OVirtPublication(Publication, autoserializable.AutoSerializable):
|
||||
return types.states.State.ERROR
|
||||
|
||||
try:
|
||||
self._state = self.service().get_template_state(self._template_id)
|
||||
if self._state == 'removed':
|
||||
status = self.service().provider().api.get_template_info(self._template_id).status
|
||||
if status == ov_types.TemplateStatus.UNKNOWN:
|
||||
raise Exception('Template has been removed!')
|
||||
except Exception as e:
|
||||
self._state = 'error'
|
||||
self._reason = str(e)
|
||||
return types.states.State.ERROR
|
||||
|
||||
# If publication os done (template is ready), and cancel was requested, do it just after template becomes ready
|
||||
if self._state == 'ok':
|
||||
if status == ov_types.TemplateStatus.OK:
|
||||
self._state = 'ok'
|
||||
if self._destroy_after:
|
||||
self._destroy_after = False
|
||||
return self.destroy()
|
||||
return types.states.State.FINISHED
|
||||
|
||||
except Exception as e:
|
||||
self._state = 'error'
|
||||
self._reason = str(e)
|
||||
return types.states.State.ERROR
|
||||
|
||||
return types.states.State.RUNNING
|
||||
|
||||
def error_reason(self) -> str:
|
||||
@ -164,7 +168,7 @@ class OVirtPublication(Publication, autoserializable.AutoSerializable):
|
||||
return types.states.State.RUNNING
|
||||
|
||||
try:
|
||||
self.service().remove_template(self._template_id)
|
||||
self.service().provider().api.remove_template(self._template_id)
|
||||
except Exception as e:
|
||||
self._state = 'error'
|
||||
self._reason = str(e)
|
||||
|
@ -41,13 +41,16 @@ from uds.core.util import validators, fields
|
||||
from uds.core.ui import gui
|
||||
|
||||
from .publication import OVirtPublication
|
||||
from .deployment import OVirtLinkedDeployment
|
||||
from .deployment import OVirtLinkedUserService
|
||||
from . import helpers
|
||||
|
||||
# Not imported at runtime, just for type checking
|
||||
if typing.TYPE_CHECKING:
|
||||
from .provider import OVirtProvider
|
||||
|
||||
|
||||
from .ovirt import types as ov_types
|
||||
|
||||
logger = logging.getLogger(__name__)
|
||||
|
||||
|
||||
@ -97,7 +100,7 @@ class OVirtLinkedService(services.Service): # pylint: disable=too-many-public-m
|
||||
# : In our case, we do no need a publication, so this is None
|
||||
publication_type = OVirtPublication
|
||||
# : Types of deploys (services in cache and/or assigned to users)
|
||||
user_service_type = OVirtLinkedDeployment
|
||||
user_service_type = OVirtLinkedUserService
|
||||
|
||||
allowed_protocols = types.transports.Protocol.generic_vdi(types.transports.Protocol.SPICE)
|
||||
services_type_provided = types.services.ServiceType.VDI
|
||||
@ -254,7 +257,10 @@ class OVirtLinkedService(services.Service): # pylint: disable=too-many-public-m
|
||||
"""
|
||||
return re.sub("[^a-zA-Z0-9_-]", "_", name)
|
||||
|
||||
def make_template(self, name: str, comments: str) -> str:
|
||||
class TemplateInfo:
|
||||
pass
|
||||
|
||||
def make_template(self, name: str, comments: str) -> ov_types.TemplateInfo:
|
||||
"""
|
||||
Invokes makeTemplate from parent provider, completing params
|
||||
|
||||
@ -281,20 +287,7 @@ class OVirtLinkedService(services.Service): # pylint: disable=too-many-public-m
|
||||
self.display.value,
|
||||
)
|
||||
|
||||
def get_template_state(self, template_id: str) -> str:
|
||||
"""
|
||||
Invokes getTemplateState from parent provider
|
||||
|
||||
Args:
|
||||
templateId: templateId to remove
|
||||
|
||||
Returns nothing
|
||||
|
||||
Raises an exception if operation fails.
|
||||
"""
|
||||
return self.provider().api.get_template_state(template_id)
|
||||
|
||||
def deploy_from_template(self, name: str, comments: str, templateId: str) -> str:
|
||||
def deploy_from_template(self, name: str, comments: str, template_id: str) -> ov_types.VMInfo:
|
||||
"""
|
||||
Deploys a virtual machine on selected cluster from selected template
|
||||
|
||||
@ -307,14 +300,14 @@ class OVirtLinkedService(services.Service): # pylint: disable=too-many-public-m
|
||||
guaranteedMB: Minimum memory guaranteed for this machine
|
||||
|
||||
Returns:
|
||||
Id of the machine being created form template
|
||||
Info of the deployed machine
|
||||
"""
|
||||
logger.debug('Deploying from template %s machine %s', templateId, name)
|
||||
logger.debug('Deploying from template %s machine %s', template_id, name)
|
||||
self.verify_free_storage()
|
||||
return self.provider().api.deploy_from_template(
|
||||
name,
|
||||
comments,
|
||||
templateId,
|
||||
template_id,
|
||||
self.cluster.value,
|
||||
self.display.value,
|
||||
self.usb.value,
|
||||
@ -322,82 +315,6 @@ class OVirtLinkedService(services.Service): # pylint: disable=too-many-public-m
|
||||
int(self.guaranteed_memory.value),
|
||||
)
|
||||
|
||||
def remove_template(self, template_id: str) -> None:
|
||||
"""
|
||||
invokes removeTemplate from parent provider
|
||||
"""
|
||||
self.provider().api.remove_template(template_id)
|
||||
|
||||
def get_machine_state(self, machine_id: str) -> str:
|
||||
"""
|
||||
Invokes getMachineState from parent provider
|
||||
(returns if machine is "active" or "inactive"
|
||||
|
||||
Args:
|
||||
machineId: If of the machine to get state
|
||||
|
||||
Returns:
|
||||
one of this values:
|
||||
unassigned, down, up, powering_up, powered_down,
|
||||
paused, migrating_from, migrating_to, unknown, not_responding,
|
||||
wait_for_launch, reboot_in_progress, saving_state, restoring_state,
|
||||
suspended, image_illegal, image_locked or powering_down
|
||||
Also can return'unknown' if Machine is not known
|
||||
"""
|
||||
return self.provider().api.get_machine_state(machine_id)
|
||||
|
||||
def start_machine(self, machine_id: str) -> None:
|
||||
"""
|
||||
Tries to start a machine. No check is done, it is simply requested to oVirt.
|
||||
|
||||
This start also "resume" suspended/paused machines
|
||||
|
||||
Args:
|
||||
machineId: Id of the machine
|
||||
|
||||
Returns:
|
||||
"""
|
||||
self.provider().api.start_machine(machine_id)
|
||||
|
||||
def stop_machine(self, machine_id: str) -> None:
|
||||
"""
|
||||
Tries to start a machine. No check is done, it is simply requested to oVirt
|
||||
|
||||
Args:
|
||||
machineId: Id of the machine
|
||||
|
||||
Returns:
|
||||
"""
|
||||
self.provider().api.stop_machine(machine_id)
|
||||
|
||||
def suspend_machine(self, machine_id: str) -> None:
|
||||
"""
|
||||
Tries to start a machine. No check is done, it is simply requested to oVirt
|
||||
|
||||
Args:
|
||||
machineId: Id of the machine
|
||||
|
||||
Returns:
|
||||
"""
|
||||
self.provider().api.suspend_machine(machine_id)
|
||||
|
||||
def remove_machine(self, machine_id: str) -> None:
|
||||
"""
|
||||
Tries to delete a machine. No check is done, it is simply requested to oVirt
|
||||
|
||||
Args:
|
||||
machineId: Id of the machine
|
||||
|
||||
Returns:
|
||||
"""
|
||||
self.provider().api.remove_machine(machine_id)
|
||||
|
||||
def update_machine_mac(self, machine_id: str, mac: str) -> None:
|
||||
"""
|
||||
Changes the mac address of first nic of the machine to the one specified
|
||||
"""
|
||||
self.provider().api.update_machine_mac(machine_id, mac)
|
||||
|
||||
def fix_usb(self, machine_id: str) -> None:
|
||||
# If has usb, upgrade vm to add it now
|
||||
if self.usb.value in ('native',):
|
||||
@ -428,7 +345,7 @@ class OVirtLinkedService(services.Service): # pylint: disable=too-many-public-m
|
||||
return self.display.value
|
||||
|
||||
def get_console_connection(self, machine_id: str) -> typing.Optional[types.services.ConsoleConnectionInfo]:
|
||||
return self.provider().api.get_console_connection(machine_id)
|
||||
return self.provider().api.get_console_connection_info(machine_id)
|
||||
|
||||
def is_avaliable(self) -> bool:
|
||||
return self.provider().is_available()
|
||||
|
@ -262,35 +262,37 @@ CLIENT_METHODS_INFO: typing.Final[list[AutoSpecMethodInfo]] = [
|
||||
),
|
||||
AutoSpecMethodInfo(
|
||||
openstack_client.OpenstackClient.get_server,
|
||||
method=lambda server_id: get_id(SERVERS_LIST, server_id), # pyright: ignore
|
||||
return_value=lambda server_id: get_id(SERVERS_LIST, server_id), # pyright: ignore
|
||||
), # pyright: ignore
|
||||
AutoSpecMethodInfo(
|
||||
openstack_client.OpenstackClient.get_volume,
|
||||
method=lambda volume_id: get_id(VOLUMES_LIST, volume_id), # pyright: ignore
|
||||
return_value=lambda volume_id: get_id(VOLUMES_LIST, volume_id), # pyright: ignore
|
||||
), # pyright: ignore
|
||||
AutoSpecMethodInfo(
|
||||
openstack_client.OpenstackClient.get_volume_snapshot,
|
||||
method=lambda snapshot_id: get_id(VOLUME_SNAPSHOTS_LIST, snapshot_id), # pyright: ignore
|
||||
return_value=lambda snapshot_id: get_id(VOLUME_SNAPSHOTS_LIST, snapshot_id), # pyright: ignore
|
||||
), # pyright: ignore
|
||||
AutoSpecMethodInfo(
|
||||
openstack_client.OpenstackClient.update_snapshot,
|
||||
method=lambda snapshot_id, name, description: get_id( # pyright: ignore
|
||||
return_value=lambda snapshot_id, name, description: get_id( # pyright: ignore
|
||||
VOLUME_SNAPSHOTS_LIST, snapshot_id # pyright: ignore
|
||||
),
|
||||
),
|
||||
AutoSpecMethodInfo(
|
||||
openstack_client.OpenstackClient.create_volume_snapshot,
|
||||
method=lambda volume_id, name, description: random.choice(VOLUME_SNAPSHOTS_LIST), # pyright: ignore
|
||||
return_value=lambda volume_id, name, description: random.choice( # pyright: ignore
|
||||
VOLUME_SNAPSHOTS_LIST,
|
||||
),
|
||||
),
|
||||
AutoSpecMethodInfo(
|
||||
openstack_client.OpenstackClient.create_volume_from_snapshot,
|
||||
method=lambda snapshot_id, name, description: get_id( # pyright: ignore
|
||||
return_value=lambda snapshot_id, name, description: get_id( # pyright: ignore
|
||||
VOLUMES_LIST, f'vid{len(VOLUMES_LIST) + 1}'
|
||||
),
|
||||
),
|
||||
AutoSpecMethodInfo(
|
||||
openstack_client.OpenstackClient.create_server_from_snapshot,
|
||||
method=lambda *args, **kwargs: random.choice(SERVERS_LIST), # pyright: ignore
|
||||
return_value=lambda *args, **kwargs: random.choice(SERVERS_LIST), # pyright: ignore
|
||||
),
|
||||
AutoSpecMethodInfo(
|
||||
openstack_client.OpenstackClient.test_connection,
|
||||
@ -305,7 +307,7 @@ CLIENT_METHODS_INFO: typing.Final[list[AutoSpecMethodInfo]] = [
|
||||
# AutoSpecMethodInfo(client.Client.list_projects, return_value=True),
|
||||
# AutoSpecMethodInfo(
|
||||
# client.ProxmoxClient.get_node_stats,
|
||||
# method=lambda node, **kwargs: next(filter(lambda n: n.name == node, NODE_STATS)), # pyright: ignore
|
||||
# return_value=lambda node, **kwargs: next(filter(lambda n: n.name == node, NODE_STATS)), # pyright: ignore
|
||||
# ),
|
||||
]
|
||||
|
||||
|
298
server/tests/services/ovirt/fixtures.py
Normal file
298
server/tests/services/ovirt/fixtures.py
Normal file
@ -0,0 +1,298 @@
|
||||
# -*- coding: utf-8 -*-
|
||||
|
||||
#
|
||||
# Copyright (c) 2024 Virtual Cable S.L.U.
|
||||
# All rights reserved.
|
||||
#
|
||||
# Redistribution and use in source and binary forms, with or without modification,
|
||||
# are permitted provided that the following conditions are met:
|
||||
#
|
||||
# * Redistributions of source code must retain the above copyright notice,
|
||||
# this list of conditions and the following disclaimer.
|
||||
# * Redistributions in binary form must reproduce the above copyright notice,
|
||||
# this list of conditions and the following disclaimer in the documentation
|
||||
# and/or other materials provided with the distribution.
|
||||
# * Neither the name of Virtual Cable S.L.U. nor the names of its contributors
|
||||
# may be used to endorse or promote products derived from this software
|
||||
# without specific prior written permission.
|
||||
#
|
||||
# THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS IS"
|
||||
# AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE
|
||||
# IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE ARE
|
||||
# DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT HOLDER OR CONTRIBUTORS BE LIABLE
|
||||
# FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR CONSEQUENTIAL
|
||||
# DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR
|
||||
# SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER
|
||||
# CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY,
|
||||
# OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE
|
||||
# OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
|
||||
|
||||
"""
|
||||
Author: Adolfo Gómez, dkmaster at dkmon dot com
|
||||
"""
|
||||
import contextlib
|
||||
import enum
|
||||
import typing
|
||||
import uuid
|
||||
import random
|
||||
|
||||
from unittest import mock
|
||||
|
||||
from uds.core import environment, types
|
||||
from uds.core.ui.user_interface import gui
|
||||
|
||||
from ...utils.autospec import autospec, AutoSpecMethodInfo
|
||||
|
||||
from uds.services.OVirt import (
|
||||
provider,
|
||||
service,
|
||||
publication,
|
||||
deployment,
|
||||
)
|
||||
from uds.services.OVirt.ovirt import client, types as ov_types
|
||||
|
||||
T = typing.TypeVar('T')
|
||||
V = typing.TypeVar('V', bound=enum.Enum)
|
||||
|
||||
|
||||
def from_list(l: typing.List[T], index: int) -> T:
|
||||
return l[index % len(l)]
|
||||
|
||||
|
||||
def from_enum(e: typing.Type[V], index: int = -1) -> V:
|
||||
if index == -1:
|
||||
index = random.randint(0, len(e) - 1)
|
||||
return list(e)[index % len(e)]
|
||||
|
||||
|
||||
def get_id(iterable: typing.Iterable[T], id: str) -> T:
|
||||
try:
|
||||
return next(filter(lambda x: x.id == id, iterable)) # type: ignore
|
||||
except StopIteration:
|
||||
raise ValueError(f'Id {id} not found in iterable') from None
|
||||
|
||||
|
||||
GUEST_IP_ADDRESS: str = '1.0.0.1'
|
||||
|
||||
STORAGES_INFO: list[ov_types.StorageInfo] = [
|
||||
ov_types.StorageInfo(
|
||||
id=f'stid-{i}',
|
||||
name=f'storage-{i}',
|
||||
type=from_enum(ov_types.StorageType, i),
|
||||
available=i * 1024 * 1024 * 1024,
|
||||
used=i * 1024 * 1024 * 1024 // 2,
|
||||
status=from_list([ov_types.StorageStatus.ACTIVE, ov_types.StorageStatus.INACTIVE], i),
|
||||
)
|
||||
for i in range(64)
|
||||
]
|
||||
|
||||
|
||||
DATACENTERS_INFO: list[ov_types.DatacenterInfo] = [
|
||||
ov_types.DatacenterInfo(
|
||||
name=f'datacenter-{i}',
|
||||
id=f'dcid-{i}',
|
||||
local_storage=bool(i % 2),
|
||||
description='The default Data Center',
|
||||
storage=[from_list(STORAGES_INFO, i * 2 + j) for j in range(4)],
|
||||
)
|
||||
for i in range(4)
|
||||
]
|
||||
|
||||
CLUSTERS_INFO: list[ov_types.ClusterInfo] = [
|
||||
ov_types.ClusterInfo(
|
||||
name=f'cluster-{i}',
|
||||
id=f'clid-{i}',
|
||||
datacenter_id=from_list(DATACENTERS_INFO, i // 2).id,
|
||||
)
|
||||
for i in range(4)
|
||||
]
|
||||
|
||||
|
||||
VMS_INFO: list[ov_types.VMInfo] = [
|
||||
ov_types.VMInfo(
|
||||
name=f'vm-{i}',
|
||||
id=f'vmid-{i}',
|
||||
cluster_id=from_list(CLUSTERS_INFO, i // 6).id,
|
||||
usb_enabled=True,
|
||||
status=ov_types.VMStatus.DOWN,
|
||||
)
|
||||
for i in range(32)
|
||||
]
|
||||
|
||||
TEMPLATES_INFO: list[ov_types.TemplateInfo] = [
|
||||
ov_types.TemplateInfo(
|
||||
id=f'tid-{i}',
|
||||
name=f'template-{i}',
|
||||
description=f'Template {i} description',
|
||||
cluster_id=from_list(CLUSTERS_INFO, i // 8).id,
|
||||
status=from_list([ov_types.TemplateStatus.OK, ov_types.TemplateStatus.UNKNOWN], i // 2),
|
||||
)
|
||||
for i in range(16)
|
||||
]
|
||||
|
||||
CONSOLE_CONNECTION_INFO: types.services.ConsoleConnectionInfo = types.services.ConsoleConnectionInfo(
|
||||
type='spice',
|
||||
address=GUEST_IP_ADDRESS,
|
||||
port=5900,
|
||||
secure_port=5901,
|
||||
cert_subject='',
|
||||
ticket=types.services.ConsoleConnectionTicket(value='ticket'),
|
||||
ca='',
|
||||
proxy='',
|
||||
monitors=1,
|
||||
)
|
||||
|
||||
|
||||
# Methods that returns None or "internal" methods are not tested
|
||||
# The idea behind this is to allow testing the provider, service and deployment classes
|
||||
# without the need of a real OpenStack environment
|
||||
CLIENT_METHODS_INFO: typing.Final[list[AutoSpecMethodInfo]] = [
|
||||
AutoSpecMethodInfo(client.Client.list_machines, return_value=VMS_INFO),
|
||||
AutoSpecMethodInfo(
|
||||
client.Client.get_machine_info,
|
||||
lambda machine_id, **kwargs: get_id(VMS_INFO, machine_id), # pyright: ignore
|
||||
),
|
||||
AutoSpecMethodInfo(client.Client.list_clusters, return_value=CLUSTERS_INFO),
|
||||
AutoSpecMethodInfo(
|
||||
client.Client.get_cluster_info,
|
||||
lambda cluster_id, **kwargs: get_id(CLUSTERS_INFO, cluster_id), # pyright: ignore
|
||||
),
|
||||
AutoSpecMethodInfo(
|
||||
client.Client.get_datacenter_info,
|
||||
lambda datacenter_id, **kwargs: get_id( # pyright: ignore
|
||||
DATACENTERS_INFO,
|
||||
datacenter_id, # pyright: ignore
|
||||
),
|
||||
),
|
||||
AutoSpecMethodInfo(
|
||||
client.Client.get_storage_info,
|
||||
lambda storage_id, **kwargs: get_id(STORAGES_INFO, storage_id), # pyright: ignore
|
||||
),
|
||||
AutoSpecMethodInfo(
|
||||
client.Client.create_template,
|
||||
return_value=lambda *args, **kwargs: random.choice(TEMPLATES_INFO), # pyright: ignore
|
||||
),
|
||||
AutoSpecMethodInfo(
|
||||
client.Client.get_template_info,
|
||||
lambda template_id, **kwargs: get_id(TEMPLATES_INFO, template_id), # pyright: ignore
|
||||
),
|
||||
AutoSpecMethodInfo(
|
||||
client.Client.deploy_from_template,
|
||||
return_value=lambda *args, **kwargs: random.choice(VMS_INFO), # pyright: ignore
|
||||
),
|
||||
AutoSpecMethodInfo(
|
||||
client.Client.get_console_connection_info,
|
||||
return_value=CONSOLE_CONNECTION_INFO,
|
||||
),
|
||||
|
||||
# connect returns None
|
||||
# Test method
|
||||
# AutoSpecMethodInfo(client.Client.list_projects, returns=True),
|
||||
# AutoSpecMethodInfo(
|
||||
# client.ProxmoxClient.get_node_stats,
|
||||
# returns=lambda node, **kwargs: next(filter(lambda n: n.name == node, NODE_STATS)), # pyright: ignore
|
||||
# ),
|
||||
]
|
||||
|
||||
PROVIDER_VALUES_DICT: typing.Final[gui.ValuesDictType] = {
|
||||
'ovirt_version': '4',
|
||||
'host': 'hoet.example.com',
|
||||
'username': 'admin@ovirt@internalsso',
|
||||
'password': 'the_testing_pass',
|
||||
'concurrent_creation_limit': 33,
|
||||
'concurrent_removal_limit': 13,
|
||||
'timeout': 176,
|
||||
'macs_range': '52:54:00:F0:F0:00-52:54:00:F0:FF:FF',
|
||||
}
|
||||
|
||||
SERVICE_VALUES_DICT: typing.Final[gui.ValuesDictType] = {
|
||||
'cluster': '5a5e5021-00d9-031e-0132-000000000044',
|
||||
'datastore': 'dd30d1d7-3c8d-4c47-ad95-6b572f635091',
|
||||
'reserved_storage_gb': 2,
|
||||
'machine': '78877a7e-005f-4418-a26a-9027a41a32a4',
|
||||
'memory': 256,
|
||||
'guaranteed_memory': 128,
|
||||
'usb': 'native',
|
||||
'display': 'spice',
|
||||
'basename': 'noso',
|
||||
'lenname': 5,
|
||||
'parent_uuid': '',
|
||||
}
|
||||
|
||||
|
||||
def create_client_mock() -> mock.Mock:
|
||||
"""
|
||||
Create a mock of ProxmoxClient
|
||||
"""
|
||||
return autospec(client.Client, CLIENT_METHODS_INFO)
|
||||
|
||||
|
||||
@contextlib.contextmanager
|
||||
def patch_provider_api(
|
||||
**kwargs: typing.Any,
|
||||
) -> typing.Generator[mock.Mock, None, None]:
|
||||
client = create_client_mock()
|
||||
try:
|
||||
mock.patch('uds.services.OVirt.provider.OVirtProvider.api', returns=client).start()
|
||||
yield client
|
||||
finally:
|
||||
mock.patch.stopall()
|
||||
|
||||
|
||||
def create_provider(**kwargs: typing.Any) -> provider.OVirtProvider:
|
||||
"""
|
||||
Create a provider
|
||||
"""
|
||||
values = PROVIDER_VALUES_DICT.copy()
|
||||
values.update(kwargs)
|
||||
|
||||
uuid_ = str(uuid.uuid4())
|
||||
return provider.OVirtProvider(
|
||||
environment=environment.Environment.private_environment(uuid), values=values, uuid=uuid_
|
||||
)
|
||||
|
||||
|
||||
def create_linked_service(provider: provider.OVirtProvider, **kwargs: typing.Any) -> service.OVirtLinkedService:
|
||||
"""
|
||||
Create a service
|
||||
"""
|
||||
values = SERVICE_VALUES_DICT.copy()
|
||||
values.update(kwargs)
|
||||
|
||||
uuid_ = str(uuid.uuid4())
|
||||
return service.OVirtLinkedService(
|
||||
provider=provider,
|
||||
environment=environment.Environment.private_environment(uuid),
|
||||
values=values,
|
||||
uuid=uuid_,
|
||||
)
|
||||
|
||||
|
||||
def create_publication(service: service.OVirtLinkedService) -> publication.OVirtPublication:
|
||||
"""
|
||||
Create a publication
|
||||
"""
|
||||
uuid_ = str(uuid.uuid4())
|
||||
return publication.OVirtPublication(
|
||||
environment=environment.Environment.private_environment(uuid_),
|
||||
service=service,
|
||||
revision=1,
|
||||
servicepool_name='servicepool_name',
|
||||
uuid=uuid_,
|
||||
)
|
||||
|
||||
|
||||
def create_linked_userservice(
|
||||
service: service.OVirtLinkedService,
|
||||
publication: typing.Optional[publication.OVirtPublication] = None,
|
||||
) -> deployment.OVirtLinkedUserService:
|
||||
"""
|
||||
Create a linked user service
|
||||
"""
|
||||
uuid_ = str(uuid.uuid4())
|
||||
return deployment.OVirtLinkedUserService(
|
||||
environment=environment.Environment.private_environment(uuid_),
|
||||
service=service,
|
||||
publication=publication or create_publication(service),
|
||||
uuid=uuid_,
|
||||
)
|
@ -315,18 +315,18 @@ CLIENT_METHODS_INFO: typing.Final[list[AutoSpecMethodInfo]] = [
|
||||
# get_machine_pool_info
|
||||
AutoSpecMethodInfo(
|
||||
client.ProxmoxClient.get_machine_pool_info,
|
||||
method=lambda vmid, poolid, **kwargs: VMS_INFO[vmid - 1], # pyright: ignore
|
||||
), # pyright: ignore
|
||||
return_value=lambda vmid, poolid, **kwargs: VMS_INFO[vmid - 1], # pyright: ignore
|
||||
),
|
||||
# get_machine_info
|
||||
AutoSpecMethodInfo(
|
||||
client.ProxmoxClient.get_machine_info,
|
||||
method=lambda vmid, *args, **kwargs: VMS_INFO[vmid - 1], # pyright: ignore
|
||||
), # pyright: ignore
|
||||
return_value=lambda vmid, *args, **kwargs: VMS_INFO[vmid - 1], # pyright: ignore
|
||||
),
|
||||
# get_machine_configuration
|
||||
AutoSpecMethodInfo(
|
||||
client.ProxmoxClient.get_machine_configuration,
|
||||
method=lambda vmid, **kwargs: VMS_CONFIGURATION[vmid - 1], # pyright: ignore
|
||||
), # pyright: ignore
|
||||
return_value=lambda vmid, **kwargs: VMS_CONFIGURATION[vmid - 1], # pyright: ignore
|
||||
),
|
||||
# enable_machine_ha return None
|
||||
# start_machine
|
||||
AutoSpecMethodInfo(client.ProxmoxClient.start_machine, return_value=UPID),
|
||||
@ -345,14 +345,14 @@ CLIENT_METHODS_INFO: typing.Final[list[AutoSpecMethodInfo]] = [
|
||||
# get_storage
|
||||
AutoSpecMethodInfo(
|
||||
client.ProxmoxClient.get_storage,
|
||||
method=lambda storage, node, **kwargs: next( # pyright: ignore
|
||||
return_value=lambda storage, node, **kwargs: next( # pyright: ignore
|
||||
filter(lambda s: s.storage == storage, STORAGES) # pyright: ignore
|
||||
), # pyright: ignore
|
||||
),
|
||||
# list_storages
|
||||
AutoSpecMethodInfo(
|
||||
client.ProxmoxClient.list_storages,
|
||||
method=lambda node, **kwargs: ( # pyright: ignore
|
||||
return_value=lambda node, **kwargs: ( # pyright: ignore
|
||||
(list(filter(lambda s: s.node == node, STORAGES))) # pyright: ignore
|
||||
if node is not None
|
||||
else STORAGES # pyright: ignore
|
||||
@ -361,14 +361,18 @@ CLIENT_METHODS_INFO: typing.Final[list[AutoSpecMethodInfo]] = [
|
||||
# get_node_stats
|
||||
AutoSpecMethodInfo(
|
||||
client.ProxmoxClient.get_node_stats,
|
||||
method=lambda node, **kwargs: next(filter(lambda n: n.name == node, NODE_STATS)), # pyright: ignore
|
||||
return_value=lambda node, **kwargs: next( # pyright: ignore
|
||||
filter(lambda n: n.name == node, NODE_STATS) # pyright: ignore
|
||||
),
|
||||
),
|
||||
# list_pools
|
||||
AutoSpecMethodInfo(client.ProxmoxClient.list_pools, return_value=POOLS),
|
||||
# get_pool_info
|
||||
AutoSpecMethodInfo(
|
||||
client.ProxmoxClient.get_pool_info,
|
||||
method=lambda poolid, **kwargs: next(filter(lambda p: p.poolid == poolid, POOLS)), # pyright: ignore
|
||||
return_value=lambda poolid, **kwargs: next( # pyright: ignore
|
||||
filter(lambda p: p.poolid == poolid, POOLS) # pyright: ignore
|
||||
),
|
||||
),
|
||||
# get_console_connection
|
||||
AutoSpecMethodInfo(client.ProxmoxClient.get_console_connection, return_value=CONSOLE_CONNECTION_INFO),
|
||||
|
@ -36,8 +36,7 @@ from unittest import mock
|
||||
@dataclasses.dataclass
|
||||
class AutoSpecMethodInfo:
|
||||
name: str|typing.Callable[..., typing.Any]
|
||||
return_value: typing.Any = None
|
||||
method: 'typing.Callable[..., typing.Any]|None' = None
|
||||
return_value: typing.Any = None # Can be a callable or a value
|
||||
|
||||
|
||||
def autospec(cls: type, metods_info: collections.abc.Iterable[AutoSpecMethodInfo], **kwargs: typing.Any) -> mock.Mock:
|
||||
@ -55,8 +54,8 @@ def autospec(cls: type, metods_info: collections.abc.Iterable[AutoSpecMethodInfo
|
||||
# Set the return value for the method or the side_effect
|
||||
name = method_info.name if isinstance(method_info.name, str) else method_info.name.__name__
|
||||
mck = getattr(obj, name)
|
||||
if method_info.method is not None:
|
||||
mck.side_effect = method_info.method
|
||||
if callable(method_info.return_value):
|
||||
mck.side_effect = method_info.return_value
|
||||
else:
|
||||
mck.return_value = method_info.return_value
|
||||
|
||||
|
Loading…
x
Reference in New Issue
Block a user