1
0
mirror of https://github.com/dkmstr/openuds.git synced 2025-01-21 18:03:54 +03:00

Update Proxmox service provider and related files

This commit is contained in:
Adolfo Gómez García 2024-02-19 17:28:30 +01:00
parent 202444f64e
commit bf8b40be6f
No known key found for this signature in database
GPG Key ID: DD1ABF20724CDA23
8 changed files with 218 additions and 196 deletions

View File

@ -88,3 +88,6 @@ BROWSER_RULES: dict[types.os.KnownBrowser, tuple] = {
types.os.KnownBrowser.SEAMONKEY: (types.os.KnownBrowser.SEAMONKEY, (types.os.KnownBrowser.FIREFOX,)),
types.os.KnownBrowser.OPERA: (types.os.KnownBrowser.OPERA, ()),
}
# Max wait time for guest shutdown
MAX_GUEST_SHUTDOWN_WAIT: typing.Final[int] = 90 # Seconds

View File

@ -64,8 +64,9 @@ class Operation(enum.IntEnum):
SNAPSHOT_CREATE = 8 # to recall process_snapshot
SNAPSHOT_RECOVER = 9 # to recall process_snapshot
PROCESS_TOKEN = 10
NOP = 11
SOFT_SHUTDOWN = 11
NOP = 98
UNKNOWN = 99
@staticmethod
@ -224,6 +225,7 @@ class FixedUserService(services.UserService, autoserializable.AutoSerializable,
Operation.SNAPSHOT_CREATE: self._snapshot_create,
Operation.SNAPSHOT_RECOVER: self._snapshot_recover,
Operation.PROCESS_TOKEN: self._process_token,
Operation.SOFT_SHUTDOWN: self._soft_shutdown_machine,
Operation.NOP: self._nop,
}
@ -257,7 +259,7 @@ class FixedUserService(services.UserService, autoserializable.AutoSerializable,
Executes opWait, it simply waits something "external" to end
"""
pass
@typing.final
def _nop(self) -> None:
"""
@ -332,31 +334,42 @@ class FixedUserService(services.UserService, autoserializable.AutoSerializable,
def _wait_checker(self) -> str:
return State.FINISHED
def _nop_checker(self) -> str:
return State.FINISHED
@abc.abstractmethod
def _start_machine(self) -> None:
"""
Override this method to start the machine if needed
"""
pass
def _start_checker(self) -> str:
"""
Checks if machine has started
"""
raise NotImplementedError()
return State.FINISHED
@abc.abstractmethod
def _start_machine(self) -> None:
raise NotImplementedError()
@abc.abstractmethod
def _stop_machine(self) -> None:
raise NotImplementedError()
"""
Override this method to stop the machine if needed
"""
pass
@abc.abstractmethod
def _stop_checker(self) -> str:
"""
Checks if machine has stoped
"""
raise NotImplementedError()
return State.FINISHED
# Not abstract methods, defaults to stop machine
def _soft_shutdown_machine(self) -> None:
"""
"""
return self._stop_machine() # Default is to stop the machine
def _soft_shutdown_checker(self) -> str:
return self._stop_checker() # Default is to check if machine has stopped
def _removed_checker(self) -> str:
"""
@ -388,6 +401,7 @@ class FixedUserService(services.UserService, autoserializable.AutoSerializable,
Operation.SNAPSHOT_CREATE: self._snapshot_create_checker,
Operation.SNAPSHOT_RECOVER: self._snapshot_recover_checker,
Operation.PROCESS_TOKEN: self._process_token_checker,
Operation.SOFT_SHUTDOWN: self._soft_shutdown_checker,
Operation.NOP: self._nop_checker,
}

View File

@ -81,6 +81,7 @@ class Operation(enum.IntEnum):
except ValueError:
return Operation.opUnknown
# The difference between "SHUTDOWN" and "GRACEFUL_STOP" is that the first one
# is used to "best try to stop" the machine to move to L2 (that is, if it cannot be stopped,
# it will be moved to L2 anyway, but keeps running), and the second one is used to "best try to stop"
@ -88,7 +89,6 @@ class Operation(enum.IntEnum):
# timeout of at most GUEST_SHUTDOWN_WAIT seconds)
# UP_STATES = ('up', 'reboot_in_progress', 'powering_up', 'restoring_state')
GUEST_SHUTDOWN_WAIT = 90 # Seconds
class ProxmoxDeployment(services.UserService, autoserializable.AutoSerializable):
@ -329,7 +329,7 @@ if sys.platform == 'win32':
if op == Operation.FINISH:
return State.FINISHED
fncs: collections.abc.Mapping[Operation, typing.Optional[collections.abc.Callable[[], str]]] = {
fncs: collections.abc.Mapping[Operation, typing.Optional[collections.abc.Callable[[], None]]] = {
Operation.CREATE: self._create,
Operation.RETRY: self._retry,
Operation.START: self._start_machine,
@ -342,7 +342,7 @@ if sys.platform == 'win32':
}
try:
operation_executor: typing.Optional[collections.abc.Callable[[], str]] = fncs.get(op, None)
operation_executor: typing.Optional[collections.abc.Callable[[], None]] = fncs.get(op, None)
if operation_executor is None:
return self._error(f'Unknown operation found at execution queue ({op})')
@ -354,7 +354,7 @@ if sys.platform == 'win32':
return self._error(e)
# Queue execution methods
def _retry(self) -> str:
def _retry(self) -> None:
"""
Used to retry an operation
In fact, this will not be never invoked, unless we push it twice, because
@ -362,15 +362,27 @@ if sys.platform == 'win32':
At executeQueue this return value will be ignored, and it will only be used at check_state
"""
pass
def _retry_checker(self) -> str:
"""
This method is not used, because retry operation is never used
"""
return State.FINISHED
def _wait(self) -> str:
def _wait(self) -> None:
"""
Executes opWait, it simply waits something "external" to end
"""
return State.RUNNING
pass
def _wait_checker(self) -> str:
"""
This method is not used, because wait operation is never used
"""
return State.FINISHED
def _create(self) -> str:
def _create(self) -> None:
"""
Deploys a machine from template for user/cache
"""
@ -389,41 +401,39 @@ if sys.platform == 'win32':
self._vmid = str(task_result.vmid)
return State.RUNNING
def _remove(self) -> str:
def _remove(self) -> None:
"""
Removes a machine from system
"""
try:
vmInfo = self.service().get_machine_info(int(self._vmid))
vm_info = self.service().get_machine_info(int(self._vmid))
except Exception as e:
raise Exception('Machine not found on remove machine') from e
if vmInfo.status != 'stopped':
logger.debug('Info status: %s', vmInfo)
if vm_info.status != 'stopped':
logger.debug('Info status: %s', vm_info)
self._queue = [Operation.STOP, Operation.REMOVE, Operation.FINISH]
return self._execute_queue()
self._execute_queue()
self._store_task(self.service().remove_machine(int(self._vmid)))
return State.RUNNING
def _start_machine(self) -> str:
def _start_machine(self) -> None:
try:
vmInfo = self.service().get_machine_info(int(self._vmid))
vm_info = self.service().get_machine_info(int(self._vmid))
except client.ProxmoxConnectionError:
return self._retry_later()
self._retry_later()
return
except Exception as e:
raise Exception('Machine not found on start machine') from e
if vmInfo.status == 'stopped':
if vm_info.status == 'stopped':
self._store_task(self.service().provider().start_machine(int(self._vmid)))
return State.RUNNING
def _stop_machine(self) -> str:
def _stop_machine(self) -> None:
try:
vm_info = self.service().get_machine_info(int(self._vmid))
except client.ProxmoxConnectionError:
self._retry_later()
return
except Exception as e:
raise Exception('Machine not found on stop machine') from e
@ -431,22 +441,19 @@ if sys.platform == 'win32':
logger.debug('Stopping machine %s', vm_info)
self._store_task(self.service().provider().stop_machine(int(self._vmid)))
return State.RUNNING
def _shutdown_machine(self) -> str:
def _shutdown_machine(self) -> None:
try:
vmInfo = self.service().get_machine_info(int(self._vmid))
vm_info = self.service().get_machine_info(int(self._vmid))
except client.ProxmoxConnectionError:
return State.RUNNING # Try again later
self._retry_later()
return
except Exception as e:
raise Exception('Machine not found or suspended machine') from e
if vmInfo.status != 'stopped':
if vm_info.status != 'stopped':
self._store_task(self.service().provider().shutdown_machine(int(self._vmid)))
return State.RUNNING
def _gracely_stop(self) -> str:
def _gracely_stop(self) -> None:
"""
Tries to stop machine using qemu guest tools
If it takes too long to stop, or qemu guest tools are not installed,
@ -454,24 +461,32 @@ if sys.platform == 'win32':
"""
self._task = ''
shutdown = -1 # Means machine already stopped
vmInfo = self.service().get_machine_info(int(self._vmid))
if vmInfo.status != 'stopped':
try:
vm_info = self.service().get_machine_info(int(self._vmid))
except client.ProxmoxConnectionError:
self._retry_later()
return
except Exception as e:
raise Exception('Machine not found on stop machine') from e
if vm_info.status != 'stopped':
self._store_task(self.service().provider().shutdown_machine(int(self._vmid)))
shutdown = sql_stamp_seconds()
logger.debug('Stoped vm using guest tools')
self.storage.put_pickle('shutdown', shutdown)
return State.RUNNING
def _update_machine_mac_and_ha(self) -> str:
def _update_machine_mac_and_ha(self) -> None:
try:
self.service().enable_ha(int(self._vmid), True) # Enable HA before continuing here
# Set vm mac address now on first interface
self.service().provider().set_machine_mac(int(self._vmid), self.get_unique_id())
except client.ProxmoxConnectionError:
self._retry_later()
return
except Exception as e:
logger.exception('Setting HA and MAC on proxmox')
raise Exception(f'Error setting MAC and HA on proxmox: {e}') from e
return State.RUNNING
# Check methods
def _check_task_finished(self) -> str:
@ -538,15 +553,15 @@ if sys.platform == 'win32':
return State.FINISHED # It's stopped
logger.debug('State is running')
if sql_stamp_seconds() - shutdown_start > GUEST_SHUTDOWN_WAIT:
if sql_stamp_seconds() - shutdown_start > consts.os.MAX_GUEST_SHUTDOWN_WAIT:
logger.debug('Time is consumed, falling back to stop')
self.do_log(
log.LogLevel.ERROR,
f'Could not shutdown machine using soft power off in time ({GUEST_SHUTDOWN_WAIT} seconds). Powering off.',
f'Could not shutdown machine using soft power off in time ({consts.os.MAX_GUEST_SHUTDOWN_WAIT} seconds). Powering off.',
)
# Not stopped by guest in time, but must be stopped normally
self.storage.put_pickle('shutdown', 0)
return self._stop_machine() # Launch "hard" stop
self._stop_machine() # Launch "hard" stop
return State.RUNNING
@ -579,8 +594,8 @@ if sys.platform == 'win32':
fncs: dict[Operation, typing.Optional[collections.abc.Callable[[], str]]] = {
Operation.CREATE: self._create_checker,
Operation.RETRY: self._retry,
Operation.WAIT: self._wait,
Operation.RETRY: self._retry_checker,
Operation.WAIT: self._wait_checker,
Operation.START: self._start_checker,
Operation.STOP: self._stop_checker,
Operation.GRACEFUL_STOP: self._graceful_stop_checker,

View File

@ -36,10 +36,11 @@ import logging
import typing
import collections.abc
from uds.core import services
from uds.core import consts, services
from uds.core.services.specializations.fixed_machine.fixed_userservice import FixedUserService, Operation
from uds.core.types.states import State
from uds.core.util import log, autoserializable
from uds.core.util.model import sql_stamp_seconds
from . import client
@ -122,16 +123,6 @@ class ProxmoxFixedUserService(FixedUserService, autoserializable.AutoSerializabl
if vminfo.status == 'stopped':
self._store_task(self.service().provider().start_machine(int(self._vmid)))
def _stop_machine(self) -> None:
try:
vm_info = self.service().get_machine_info(int(self._vmid))
except Exception as e:
raise Exception('Machine not found on stop machine') from e
if vm_info.status != 'stopped':
logger.debug('Stopping machine %s', vm_info)
self._store_task(self.service().provider().stop_machine(int(self._vmid)))
# Check methods
def _check_task_finished(self) -> str:
if self._task == '':
@ -158,9 +149,3 @@ class ProxmoxFixedUserService(FixedUserService, autoserializable.AutoSerializabl
Checks if machine has started
"""
return self._check_task_finished()
def _stop_checker(self) -> str:
"""
Checks if machine has stoped
"""
return self._check_task_finished()

View File

@ -177,14 +177,14 @@ class ProxmoxProvider(services.ServiceProvider):
def list_storages(self, node: typing.Optional[str] = None, force: bool = False) -> list[client.types.StorageInfo]:
return self._api().list_storages(node=node, content='images', force=force)
def list_pools(self) -> list[client.types.PoolInfo]:
return self._api().list_pools()
def list_pools(self, force: bool = False) -> list[client.types.PoolInfo]:
return self._api().list_pools(force=force)
def get_pool_info(self, pool_id: str, retrieve_vm_names: bool = False) -> client.types.PoolInfo:
return self._api().get_pool_info(pool_id, retrieve_vm_names=retrieve_vm_names)
def get_pool_info(self, pool_id: str, retrieve_vm_names: bool = False, force: bool = False) -> client.types.PoolInfo:
return self._api().get_pool_info(pool_id, retrieve_vm_names=retrieve_vm_names, force=force)
def create_template(self, vmid: int) -> None:
return self._api().convert_to_template(vmid)
self._api().convert_to_template(vmid)
def clone_machine(
self,
@ -253,13 +253,16 @@ class ProxmoxProvider(services.ServiceProvider):
return self._api().get_console_connection(int(machine_id), node)
def get_new_vmid(self) -> int:
while True: # look for an unused VmId
MAX_RETRIES: typing.Final[int] = 512 # So we don't loop forever, just in case...
vmid = 0
for _ in range(MAX_RETRIES):
vmid = self._vmid_generator.get(self.start_vmid.as_int(), MAX_VMID)
if self._api().is_vmid_available(vmid):
return vmid
# All assigned vmid will be left as unusable on UDS until released by time (3 years)
# This is not a problem at all, in the rare case that a machine id is released from uds db
# if it exists when we try to create a new one, we will simply try to get another one
raise client.ProxmoxError(f'Could not get a new vmid!!: last tried {vmid}')
def get_guest_ip_address(self, vmid: int, node: typing.Optional[str] = None) -> str:
return self._api().get_guest_ip_address(vmid, node)

View File

@ -124,7 +124,7 @@ class ProxmoxPublication(services.Publication, autoserializable.AutoSerializable
return self._state
node, upid = self._task.split(',')
try:
task = self.service().get_task_info(node, upid)
task = self.service().provider().get_task_info(node, upid)
if task.is_running():
return State.RUNNING
except Exception as e:
@ -142,7 +142,7 @@ class ProxmoxPublication(services.Publication, autoserializable.AutoSerializable
self._state = State.FINISHED
if self._operation == 'p': # not Destroying
# Disable Protection (removal)
self.service().set_protection(int(self._vm), protection=False)
self.service().provider().set_protection(int(self._vm), protection=False)
time.sleep(
0.5
) # Give some tome to proxmox. We have observed some concurrency issues
@ -150,7 +150,7 @@ class ProxmoxPublication(services.Publication, autoserializable.AutoSerializable
self.service().enable_ha(int(self._vm))
time.sleep(0.5)
# Mark vm as template
self.service().make_template(int(self._vm))
self.service().provider().create_template(int(self._vm))
# This seems to cause problems on Proxmox
# makeTemplate --> setProtection (that calls "config"). Seems that the HD dissapears...

View File

@ -128,6 +128,13 @@ VGPUS: typing.Final[list[client.types.VGPUInfo]] = [
),
]
HA_GROUPS: typing.Final[list[str]] = [
'ha_group_1',
'ha_group_2',
'ha_group_3',
'ha_group_4',
]
VMS_INFO: typing.Final[list[client.types.VMInfo]] = [
client.types.VMInfo(
status='status',
@ -163,7 +170,7 @@ VMS_CONFIGURATION: typing.Final[list[client.types.VMConfiguration]] = [
cores=1,
vmgenid='vmgenid',
digest='digest',
networks=[client.types.NetworkConfiguration(net='net', type='type', mac='mac')],
networks=[client.types.NetworkConfiguration(net='net', type='type', mac=f'{i:02x}:00:00:00:00:{i:02x}')],
tpmstate0='tpmstate0',
template=bool(i > 8), # Last two are templates
)
@ -235,9 +242,11 @@ POOLS: typing.Final[list[client.types.PoolInfo]] = [
for i in range(10)
]
GUEST_IP_ADDRESS: typing.Final[str] = '1.0.0.1'
CONSOLE_CONNECTION: typing.Final[types.services.ConsoleConnectionInfo] = types.services.ConsoleConnectionInfo(
type='spice',
address='2.2.2.2',
address=GUEST_IP_ADDRESS,
port=5900,
secure_port=5901,
cert_subject='',
@ -270,12 +279,12 @@ CLIENT_METHODS_INFO: typing.Final[list[AutoSpecMethodInfo]] = [
# clone_machine
AutoSpecMethodInfo('clone_machine', return_value=VM_CREATION_RESULT),
# list_ha_groups
AutoSpecMethodInfo('list_ha_groups', return_value=['ha_group_1', 'ha_group_2']),
AutoSpecMethodInfo('list_ha_groups', return_value=HA_GROUPS),
# enable_machine_ha return None
# disable_machine_ha return None
# set_protection return None
# get_guest_ip_address
AutoSpecMethodInfo('get_guest_ip_address', return_value='1.0.0.1'),
AutoSpecMethodInfo('get_guest_ip_address', return_value=GUEST_IP_ADDRESS),
# remove_machine
AutoSpecMethodInfo('remove_machine', return_value=UPID),
# list_snapshots
@ -317,16 +326,26 @@ CLIENT_METHODS_INFO: typing.Final[list[AutoSpecMethodInfo]] = [
AutoSpecMethodInfo('convert_to_template', return_value=UPID),
# get_storage
AutoSpecMethodInfo(
'get_storage', method=lambda storage, node, **kwargs: next(filter(lambda s: s.storage == storage, STORAGES))
'get_storage',
method=lambda storage, node, **kwargs: next(filter(lambda s: s.storage == storage, STORAGES)),
),
# list_storages
AutoSpecMethodInfo('list_storages', method=lambda node, **kwargs: (list(filter(lambda s: s.node == node, STORAGES))) if node is not None else STORAGES),
AutoSpecMethodInfo(
'list_storages',
method=lambda node, **kwargs: (
(list(filter(lambda s: s.node == node, STORAGES))) if node is not None else STORAGES
),
),
# get_node_stats
AutoSpecMethodInfo(
'get_node_stats', method=lambda node, **kwargs: next(filter(lambda n: n.name == node, NODE_STATS))
),
# list_pools
AutoSpecMethodInfo('list_pools', return_value=POOLS),
# get_pool_info
AutoSpecMethodInfo(
'get_pool_info', method=lambda poolid, **kwargs: next(filter(lambda p: p.poolid == poolid, POOLS))
),
# get_console_connection
AutoSpecMethodInfo('get_console_connection', return_value=CONSOLE_CONNECTION),
]

View File

@ -134,7 +134,7 @@ class TestProxmovProvider(UDSTestCase):
self.assertEqual(provider.test_connection(), True)
api.test.assert_called_once_with()
self.assertEqual(provider.list_machines(force=True), fixtures.VMS_INFO)
api.list_machines.assert_called_once_with(force=True)
api.list_machines.reset_mock()
@ -143,139 +143,122 @@ class TestProxmovProvider(UDSTestCase):
self.assertEqual(provider.get_machine_info(1), fixtures.VMS_INFO[0])
api.get_machine_pool_info.assert_called_once_with(1, None, force=True)
self.assertEqual(provider.get_machine_configuration(1), fixtures.VMS_CONFIGURATION[0])
api.get_machine_configuration.assert_called_once_with(1, force=True)
self.assertEqual(provider.get_storage_info(fixtures.STORAGES[2].storage, fixtures.STORAGES[2].node, force=True), fixtures.STORAGES[2])
api.get_storage.assert_called_once_with(fixtures.STORAGES[2].storage, fixtures.STORAGES[2].node, force=True)
self.assertEqual(
provider.get_storage_info(fixtures.STORAGES[2].storage, fixtures.STORAGES[2].node, force=True),
fixtures.STORAGES[2],
)
api.get_storage.assert_called_once_with(
fixtures.STORAGES[2].storage, fixtures.STORAGES[2].node, force=True
)
api.get_storage.reset_mock()
self.assertEqual(provider.get_storage_info(fixtures.STORAGES[2].storage, fixtures.STORAGES[2].node), fixtures.STORAGES[2])
api.get_storage.assert_called_once_with(fixtures.STORAGES[2].storage, fixtures.STORAGES[2].node, force=False)
self.assertEqual(provider.list_storages(fixtures.STORAGES[2].node), list(filter(lambda x: x.node == fixtures.STORAGES[2].node, fixtures.STORAGES)))
api.list_storages.assert_called_once_with(node=fixtures.STORAGES[2].node, content='images', force=False)
self.assertEqual(
provider.get_storage_info(fixtures.STORAGES[2].storage, fixtures.STORAGES[2].node),
fixtures.STORAGES[2],
)
api.get_storage.assert_called_once_with(
fixtures.STORAGES[2].storage, fixtures.STORAGES[2].node, force=False
)
self.assertEqual(
provider.list_storages(fixtures.STORAGES[2].node),
list(filter(lambda x: x.node == fixtures.STORAGES[2].node, fixtures.STORAGES)),
)
api.list_storages.assert_called_once_with(
node=fixtures.STORAGES[2].node, content='images', force=False
)
api.list_storages.reset_mock()
self.assertEqual(provider.list_storages(), fixtures.STORAGES)
api.list_storages.assert_called_once_with(node=None, content='images', force=False)
# def list_pools(self) -> list[client.types.PoolInfo]:
# return self._api().list_pools()
# def get_pool_info(self, pool_id: str, retrieve_vm_names: bool = False) -> client.types.PoolInfo:
# return self._api().get_pool_info(pool_id, retrieve_vm_names=retrieve_vm_names)
self.assertEqual(provider.list_pools(force=True), fixtures.POOLS)
api.list_pools.assert_called_once_with(force=True)
api.list_pools.reset_mock()
self.assertEqual(provider.list_pools(), fixtures.POOLS)
api.list_pools.assert_called_once_with(force=False)
# def create_template(self, vmid: int) -> None:
# return self._api().convert_to_template(vmid)
self.assertEqual(
provider.get_pool_info(fixtures.POOLS[2].poolid, retrieve_vm_names=True, force=True),
fixtures.POOLS[2],
)
api.get_pool_info.assert_called_once_with(
fixtures.POOLS[2].poolid, retrieve_vm_names=True, force=True
)
api.get_pool_info.reset_mock()
self.assertEqual(provider.get_pool_info(fixtures.POOLS[2].poolid), fixtures.POOLS[2])
api.get_pool_info.assert_called_once_with(
fixtures.POOLS[2].poolid, retrieve_vm_names=False, force=False
)
# def clone_machine(
# self,
# vmid: int,
# name: str,
# description: typing.Optional[str],
# as_linked_clone: bool,
# target_node: typing.Optional[str] = None,
# target_storage: typing.Optional[str] = None,
# target_pool: typing.Optional[str] = None,
# must_have_vgpus: typing.Optional[bool] = None,
# ) -> client.types.VmCreationResult:
# return self._api().clone_machine(
# vmid,
# self.get_new_vmid(),
# name,
# description,
# as_linked_clone,
# target_node,
# target_storage,
# target_pool,
# must_have_vgpus,
# )
provider.create_template(1)
api.convert_to_template.assert_called_once_with(1)
# def start_machine(self, vmid: int) -> client.types.UPID:
# return self._api().start_machine(vmid)
self.assertEqual(
provider.clone_machine(1, 'name', 'description', True, 'node', 'storage', 'pool', True),
fixtures.VM_CREATION_RESULT,
)
api.clone_machine.assert_called_once_with(
1, mock.ANY, 'name', 'description', True, 'node', 'storage', 'pool', True
)
# def stop_machine(self, vmid: int) -> client.types.UPID:
# return self._api().stop_machine(vmid)
self.assertEqual(provider.start_machine(1), fixtures.UPID)
api.start_machine.assert_called_once_with(1)
# def reset_machine(self, vmid: int) -> client.types.UPID:
# return self._api().reset_machine(vmid)
self.assertEqual(provider.stop_machine(1), fixtures.UPID)
api.stop_machine.assert_called_once_with(1)
# def suspend_machine(self, vmId: int) -> client.types.UPID:
# return self._api().suspend_machine(vmId)
self.assertEqual(provider.reset_machine(1), fixtures.UPID)
api.reset_machine.assert_called_once_with(1)
# def shutdown_machine(self, vmid: int) -> client.types.UPID:
# return self._api().shutdown_machine(vmid)
self.assertEqual(provider.suspend_machine(1), fixtures.UPID)
api.suspend_machine.assert_called_once_with(1)
# def remove_machine(self, vmid: int) -> client.types.UPID:
# return self._api().remove_machine(vmid)
self.assertEqual(provider.shutdown_machine(1), fixtures.UPID)
api.shutdown_machine.assert_called_once_with(1)
# def get_task_info(self, node: str, upid: str) -> client.types.TaskStatus:
# return self._api().get_task(node, upid)
self.assertEqual(provider.remove_machine(1), fixtures.UPID)
api.remove_machine.assert_called_once_with(1)
# def enable_ha(self, vmid: int, started: bool = False, group: typing.Optional[str] = None) -> None:
# self._api().enable_machine_ha(vmid, started, group)
self.assertEqual(provider.get_task_info('node', 'upid'), fixtures.TASK_STATUS)
api.get_task.assert_called_once_with('node', 'upid')
# def set_machine_mac(self, vmid: int, macAddress: str) -> None:
# self._api().set_machine_ha(vmid, macAddress)
provider.enable_ha(1, True, 'group')
api.enable_machine_ha.assert_called_once_with(1, True, 'group')
# def disable_ha(self, vmid: int) -> None:
# self._api().disable_machine_ha(vmid)
provider.set_machine_mac(1, 'mac')
api.set_machine_ha.assert_called_once_with(1, 'mac')
# def set_protection(self, vmid: int, node: typing.Optional[str] = None, protection: bool = False) -> None:
# self._api().set_protection(vmid, node, protection)
provider.disable_ha(1)
api.disable_machine_ha.assert_called_once_with(1)
# def list_ha_groups(self) -> list[str]:
# return self._api().list_ha_groups()
provider.set_protection(1, 'node', True)
api.set_protection.assert_called_once_with(1, 'node', True)
# def get_console_connection(
# self,
# machine_id: str,
# node: typing.Optional[str] = None,
# ) -> typing.Optional[types.services.ConsoleConnectionInfo]:
# return self._api().get_console_connection(int(machine_id), node)
self.assertEqual(provider.list_ha_groups(), fixtures.HA_GROUPS)
api.list_ha_groups.assert_called_once_with()
# def get_new_vmid(self) -> int:
# while True: # look for an unused VmId
# vmid = self._vmid_generator.get(self.start_vmid.as_int(), MAX_VMID)
# if self._api().is_vmid_available(vmid):
# return vmid
# # All assigned vmid will be left as unusable on UDS until released by time (3 years)
# # This is not a problem at all, in the rare case that a machine id is released from uds db
# # if it exists when we try to create a new one, we will simply try to get another one
self.assertEqual(provider.get_console_connection('1'), fixtures.CONSOLE_CONNECTION)
api.get_console_connection.assert_called_once_with(1, None)
# def get_guest_ip_address(self, vmid: int, node: typing.Optional[str] = None) -> str:
# return self._api().get_guest_ip_address(vmid, node)
vmid = provider.get_new_vmid()
for i in range(1, 128):
self.assertEqual(provider.get_new_vmid(), vmid + i)
# def supports_snapshot(self, vmid: int, node: typing.Optional[str] = None) -> bool:
# return self._api().supports_snapshot(vmid, node)
self.assertEqual(provider.get_guest_ip_address(1), fixtures.GUEST_IP_ADDRESS)
api.get_guest_ip_address.assert_called_once_with(1, None)
# def get_current_snapshot(
# self, vmid: int, node: typing.Optional[str] = None
# ) -> typing.Optional[client.types.SnapshotInfo]:
# return (
# sorted(
# filter(lambda x: x.snaptime, self._api().list_snapshots(vmid, node)),
# key=lambda x: x.snaptime or 0,
# reverse=True,
# )
# + [None]
# )[0]
self.assertEqual(provider.supports_snapshot(1), True)
api.supports_snapshot.assert_called_once_with(1, None)
# def create_snapshot(
# self,
# vmid: int,
# node: typing.Optional[str] = None,
# name: typing.Optional[str] = None,
# description: typing.Optional[str] = None,
# ) -> client.types.UPID:
# return self._api().create_snapshot(vmid, node, name, description)
api.list_snapshots.reset_mock()
self.assertEqual(provider.get_current_snapshot(1), fixtures.SNAPSHOTS_INFO[0])
api.list_snapshots.assert_called_once_with(1, None)
# def restore_snapshot(
# self, vmid: int, node: typing.Optional[str] = None, name: typing.Optional[str] = None
# ) -> client.types.UPID:
# """
# In fact snapshot is not optional, but node is and want to keep the same signature as the api
# """
# return self._api().restore_snapshot(vmid, node, name)
self.assertEqual(provider.create_snapshot(1), fixtures.UPID)
api.create_snapshot.assert_called_once_with(1, None, None, None)
provider.restore_snapshot(1, 'node', 'name')
api.restore_snapshot.assert_called_once_with(1, 'node', 'name')