1
0
mirror of https://github.com/dkmstr/openuds.git synced 2025-02-03 13:47:14 +03:00

Cleaning and refactoring proxmox, and fixed tests

This commit is contained in:
Adolfo Gómez García 2024-07-06 02:31:05 +02:00
parent 28ed59e185
commit a1499b0895
No known key found for this signature in database
GPG Key ID: DD1ABF20724CDA23
17 changed files with 596 additions and 353 deletions

View File

@ -31,6 +31,7 @@
Author: Adolfo Gómez, dkmaster at dkmon dot com
"""
import contextlib
import copy
import functools
import typing
import datetime
@ -55,12 +56,12 @@ from uds.services.Proxmox import (
from uds.services.Proxmox.proxmox import types as prox_types
NODES: typing.Final[list[prox_types.Node]] = [
DEF_NODES: list[prox_types.Node] = [
prox_types.Node(name='node0', online=True, local=True, nodeid=1, ip='0.0.0.1', level='level', id='id'),
prox_types.Node(name='node1', online=True, local=True, nodeid=2, ip='0.0.0.2', level='level', id='id'),
]
NODE_STATS: typing.Final[list[prox_types.NodeStats]] = [
DEF_NODE_STATS: list[prox_types.NodeStats] = [
prox_types.NodeStats(
name='name',
status='status',
@ -90,14 +91,14 @@ NODE_STATS: typing.Final[list[prox_types.NodeStats]] = [
]
CLUSTER_INFO: typing.Final[prox_types.ClusterInfo] = prox_types.ClusterInfo(
DEF_CLUSTER_INFO: prox_types.ClusterInfo = prox_types.ClusterInfo(
cluster=prox_types.Cluster(name='name', version='version', id='id', nodes=2, quorate=1),
nodes=NODES,
nodes=DEF_NODES,
)
STORAGES: typing.Final[list[prox_types.StorageInfo]] = [
DEF_STORAGES: list[prox_types.StorageInfo] = [
prox_types.StorageInfo(
node=NODES[i % len(NODES)].name,
node=DEF_NODES[i % len(DEF_NODES)].name,
storage=f'storage_{i}',
content=(f'content{i}',) * (i % 3),
type='images',
@ -112,7 +113,7 @@ STORAGES: typing.Final[list[prox_types.StorageInfo]] = [
]
VGPUS: typing.Final[list[prox_types.VGPUInfo]] = [
DEF_VGPUS: list[prox_types.VGPUInfo] = [
prox_types.VGPUInfo(
name='name_1',
description='description_1',
@ -136,18 +137,18 @@ VGPUS: typing.Final[list[prox_types.VGPUInfo]] = [
),
]
HA_GROUPS: typing.Final[list[str]] = [
DEF_HA_GROUPS: list[str] = [
'ha_group_1',
'ha_group_2',
'ha_group_3',
'ha_group_4',
]
VMS_INFO: list[prox_types.VMInfo] = [
DEF_VMS_INFO: list[prox_types.VMInfo] = [
prox_types.VMInfo(
status='stopped',
vmid=i,
node=NODES[i % len(NODES)].name,
node=DEF_NODES[i % len(DEF_NODES)].name,
template=True,
agent='agent',
cpus=1,
@ -165,12 +166,12 @@ VMS_INFO: list[prox_types.VMInfo] = [
netout=1,
diskread=1,
diskwrite=1,
vgpu_type=VGPUS[i % len(VGPUS)].type,
vgpu_type=DEF_VGPUS[i % len(DEF_VGPUS)].type,
)
for i in range(1, 16)
]
VMS_CONFIGURATION: typing.Final[list[prox_types.VMConfiguration]] = [
DEF_VMS_CONFIGURATION: list[prox_types.VMConfiguration] = [
prox_types.VMConfiguration(
name=f'vm_name_{i}',
vga='cirrus',
@ -190,26 +191,26 @@ VMS_CONFIGURATION: typing.Final[list[prox_types.VMConfiguration]] = [
]
UPID: typing.Final[prox_types.UPID] = prox_types.UPID(
node=NODES[0].name,
DEF_UPID: prox_types.UPID = prox_types.UPID(
node=DEF_NODES[0].name,
pid=1,
pstart=1,
starttime=datetime.datetime.now(),
type='type',
vmid=VMS_INFO[0].vmid,
vmid=DEF_VMS_INFO[0].vmid,
user='user',
upid='upid',
)
VM_CREATION_RESULT: typing.Final[prox_types.VmCreationResult] = prox_types.VmCreationResult(
node=NODES[0].name,
vmid=VMS_INFO[0].vmid,
upid=UPID,
DEF_VM_CREATION_RESULT: prox_types.VmCreationResult = prox_types.VmCreationResult(
node=DEF_NODES[0].name,
vmid=DEF_VMS_INFO[0].vmid,
upid=DEF_UPID,
)
SNAPSHOTS_INFO: typing.Final[list[prox_types.SnapshotInfo]] = [
DEF_SNAPSHOTS_INFO: list[prox_types.SnapshotInfo] = [
prox_types.SnapshotInfo(
name=f'snap_name_{i}',
description=f'snap desription{i}',
@ -220,8 +221,8 @@ SNAPSHOTS_INFO: typing.Final[list[prox_types.SnapshotInfo]] = [
for i in range(10)
]
TASK_STATUS = prox_types.TaskStatus(
node=NODES[0].name,
DEF_TASK_STATUS = prox_types.TaskStatus(
node=DEF_NODES[0].name,
pid=1,
pstart=1,
starttime=datetime.datetime.now(),
@ -233,43 +234,82 @@ TASK_STATUS = prox_types.TaskStatus(
id='id',
)
POOL_MEMBERS: typing.Final[list[prox_types.PoolMemberInfo]] = [
DEF_POOL_MEMBERS: list[prox_types.PoolMemberInfo] = [
prox_types.PoolMemberInfo(
id=f'id_{i}',
node=NODES[i % len(NODES)].name,
storage=STORAGES[i % len(STORAGES)].storage,
node=DEF_NODES[i % len(DEF_NODES)].name,
storage=DEF_STORAGES[i % len(DEF_STORAGES)].storage,
type='type',
vmid=VMS_INFO[i % len(VMS_INFO)].vmid,
vmname=VMS_INFO[i % len(VMS_INFO)].name or '',
vmid=DEF_VMS_INFO[i % len(DEF_VMS_INFO)].vmid,
vmname=DEF_VMS_INFO[i % len(DEF_VMS_INFO)].name or '',
)
for i in range(10)
]
POOLS: typing.Final[list[prox_types.PoolInfo]] = [
DEF_POOLS: list[prox_types.PoolInfo] = [
prox_types.PoolInfo(
poolid=f'pool_{i}',
comments=f'comments_{i}',
members=POOL_MEMBERS,
members=DEF_POOL_MEMBERS,
)
for i in range(10)
]
GUEST_IP_ADDRESS: typing.Final[str] = '1.0.0.1'
DEF_GUEST_IP_ADDRESS: str = '1.0.0.1'
CONSOLE_CONNECTION_INFO: typing.Final[types.services.ConsoleConnectionInfo] = (
types.services.ConsoleConnectionInfo(
type='spice',
address=GUEST_IP_ADDRESS,
port=5900,
secure_port=5901,
cert_subject='',
ticket=types.services.ConsoleConnectionTicket(value='ticket'),
ca='',
proxy='',
monitors=1,
)
DEF_CONSOLE_CONNECTION_INFO: types.services.ConsoleConnectionInfo = types.services.ConsoleConnectionInfo(
type='spice',
address=DEF_GUEST_IP_ADDRESS,
port=5900,
secure_port=5901,
cert_subject='',
ticket=types.services.ConsoleConnectionTicket(value='ticket'),
ca='',
proxy='',
monitors=1,
)
# clone values to avoid modifying the original ones
NODES: list[prox_types.Node] = copy.deepcopy(DEF_NODES)
NODE_STATS: list[prox_types.NodeStats] = copy.deepcopy(DEF_NODE_STATS)
CLUSTER_INFO: prox_types.ClusterInfo = copy.deepcopy(DEF_CLUSTER_INFO)
STORAGES: list[prox_types.StorageInfo] = copy.deepcopy(DEF_STORAGES)
VGPUS: list[prox_types.VGPUInfo] = copy.deepcopy(DEF_VGPUS)
HA_GROUPS: list[str] = copy.deepcopy(DEF_HA_GROUPS)
VMS_INFO: list[prox_types.VMInfo] = copy.deepcopy(DEF_VMS_INFO)
VMS_CONFIGURATION: list[prox_types.VMConfiguration] = copy.deepcopy(DEF_VMS_CONFIGURATION)
UPID: prox_types.UPID = copy.deepcopy(DEF_UPID)
VM_CREATION_RESULT: prox_types.VmCreationResult = copy.deepcopy(DEF_VM_CREATION_RESULT)
SNAPSHOTS_INFO: list[prox_types.SnapshotInfo] = copy.deepcopy(DEF_SNAPSHOTS_INFO)
TASK_STATUS: prox_types.TaskStatus = copy.deepcopy(DEF_TASK_STATUS)
POOLS: list[prox_types.PoolInfo] = copy.deepcopy(DEF_POOLS)
GUEST_IP_ADDRESS: str = copy.deepcopy(DEF_GUEST_IP_ADDRESS)
CONSOLE_CONNECTION_INFO: types.services.ConsoleConnectionInfo = copy.deepcopy(DEF_CONSOLE_CONNECTION_INFO)
def clear() -> None:
"""
Resets all values to the default ones
"""
global CLUSTER_INFO, UPID, VM_CREATION_RESULT, TASK_STATUS, GUEST_IP_ADDRESS, CONSOLE_CONNECTION_INFO
NODES[:] = copy.deepcopy(DEF_NODES)
NODE_STATS[:] = copy.deepcopy(DEF_NODE_STATS)
CLUSTER_INFO = copy.deepcopy(DEF_CLUSTER_INFO) # pyright: ignore
STORAGES[:] = copy.deepcopy(DEF_STORAGES)
STORAGES[:] = copy.deepcopy(DEF_STORAGES)
VGPUS[:] = copy.deepcopy(DEF_VGPUS)
HA_GROUPS[:] = copy.deepcopy(DEF_HA_GROUPS)
VMS_INFO[:] = copy.deepcopy(DEF_VMS_INFO)
VMS_CONFIGURATION[:] = copy.deepcopy(DEF_VMS_CONFIGURATION)
UPID = copy.deepcopy(DEF_UPID) # pyright: ignore
VM_CREATION_RESULT = copy.deepcopy(DEF_VM_CREATION_RESULT) # pyright: ignore
SNAPSHOTS_INFO[:] = copy.deepcopy(DEF_SNAPSHOTS_INFO)
TASK_STATUS = copy.deepcopy(DEF_TASK_STATUS) # pyright: ignore
POOLS[:] = copy.deepcopy(DEF_POOLS)
GUEST_IP_ADDRESS = copy.deepcopy(DEF_GUEST_IP_ADDRESS) # pyright: ignore
CONSOLE_CONNECTION_INFO = copy.deepcopy(DEF_CONSOLE_CONNECTION_INFO) # pyright: ignore
def replace_vm_info(vmid: int, **kwargs: typing.Any) -> prox_types.UPID:
"""
@ -277,7 +317,8 @@ def replace_vm_info(vmid: int, **kwargs: typing.Any) -> prox_types.UPID:
"""
for i in range(len(VMS_INFO)):
if VMS_INFO[i].vmid == vmid:
VMS_INFO[i] = VMS_INFO[i]._replace(**kwargs)
for k, v in kwargs.items():
setattr(VMS_INFO[i], k, v)
break
return UPID
@ -286,39 +327,62 @@ def replacer_vm_info(**kwargs: typing.Any) -> typing.Callable[..., prox_types.UP
return functools.partial(replace_vm_info, **kwargs)
T = typing.TypeVar('T')
def returner(value: T, *args: typing.Any, **kwargs: typing.Any) -> typing.Callable[..., T]:
def inner(*args: typing.Any, **kwargs: typing.Any) -> T:
return value
return inner
# Methods that returns None or "internal" methods are not tested
CLIENT_METHODS_INFO: typing.Final[list[AutoSpecMethodInfo]] = [
CLIENT_METHODS_INFO: list[AutoSpecMethodInfo] = [
# connect returns None
# Test method
AutoSpecMethodInfo(uds.services.Proxmox.proxmox.client.ProxmoxClient.test, returns=True),
# get_cluster_info
AutoSpecMethodInfo(uds.services.Proxmox.proxmox.client.ProxmoxClient.get_cluster_info, returns=CLUSTER_INFO),
AutoSpecMethodInfo(
uds.services.Proxmox.proxmox.client.ProxmoxClient.get_cluster_info, returns=CLUSTER_INFO
),
# get_next_vmid
AutoSpecMethodInfo(uds.services.Proxmox.proxmox.client.ProxmoxClient.get_next_vmid, returns=1),
# is_vmid_available
AutoSpecMethodInfo(uds.services.Proxmox.proxmox.client.ProxmoxClient.is_vmid_available, returns=True),
# get_node_networks, not called never (ensure it's not called by mistake)
# list_node_gpu_devices
AutoSpecMethodInfo(uds.services.Proxmox.proxmox.client.ProxmoxClient.list_node_gpu_devices, returns=['gpu_dev_1', 'gpu_dev_2']),
AutoSpecMethodInfo(
uds.services.Proxmox.proxmox.client.ProxmoxClient.list_node_gpu_devices,
returns=['gpu_dev_1', 'gpu_dev_2'],
),
# list_node_vgpus
AutoSpecMethodInfo(uds.services.Proxmox.proxmox.client.ProxmoxClient.list_node_vgpus, returns=VGPUS),
# node_has_vgpus_available
AutoSpecMethodInfo(uds.services.Proxmox.proxmox.client.ProxmoxClient.node_has_vgpus_available, returns=True),
AutoSpecMethodInfo(
uds.services.Proxmox.proxmox.client.ProxmoxClient.node_has_vgpus_available, returns=True
),
# get_best_node_for_machine
AutoSpecMethodInfo(uds.services.Proxmox.proxmox.client.ProxmoxClient.get_best_node_for_machine, returns=NODE_STATS[0]),
AutoSpecMethodInfo(
uds.services.Proxmox.proxmox.client.ProxmoxClient.get_best_node_for_vm, returns=NODE_STATS[0]
),
# clone_machine
AutoSpecMethodInfo(uds.services.Proxmox.proxmox.client.ProxmoxClient.clone_machine, returns=VM_CREATION_RESULT),
AutoSpecMethodInfo(uds.services.Proxmox.proxmox.client.ProxmoxClient.clone_vm, returns=VM_CREATION_RESULT),
# list_ha_groups
AutoSpecMethodInfo(uds.services.Proxmox.proxmox.client.ProxmoxClient.list_ha_groups, returns=HA_GROUPS),
# enable_machine_ha return None
# disable_machine_ha return None
# set_protection return None
# get_guest_ip_address
AutoSpecMethodInfo(uds.services.Proxmox.proxmox.client.ProxmoxClient.get_guest_ip_address, returns=GUEST_IP_ADDRESS),
AutoSpecMethodInfo(
uds.services.Proxmox.proxmox.client.ProxmoxClient.get_guest_ip_address, returns=GUEST_IP_ADDRESS
),
# remove_machine
AutoSpecMethodInfo(uds.services.Proxmox.proxmox.client.ProxmoxClient.remove_machine, returns=UPID),
AutoSpecMethodInfo(uds.services.Proxmox.proxmox.client.ProxmoxClient.delete_vm, returns=UPID),
# list_snapshots
AutoSpecMethodInfo(uds.services.Proxmox.proxmox.client.ProxmoxClient.list_snapshots, returns=SNAPSHOTS_INFO),
AutoSpecMethodInfo(
uds.services.Proxmox.proxmox.client.ProxmoxClient.list_snapshots, returns=SNAPSHOTS_INFO
),
# supports_snapshot
AutoSpecMethodInfo(uds.services.Proxmox.proxmox.client.ProxmoxClient.supports_snapshot, returns=True),
# create_snapshot
@ -328,42 +392,63 @@ CLIENT_METHODS_INFO: typing.Final[list[AutoSpecMethodInfo]] = [
# restore_snapshot
AutoSpecMethodInfo(uds.services.Proxmox.proxmox.client.ProxmoxClient.restore_snapshot, returns=UPID),
# get_task
AutoSpecMethodInfo(uds.services.Proxmox.proxmox.client.ProxmoxClient.get_task, returns=TASK_STATUS),
# list_machines
AutoSpecMethodInfo(uds.services.Proxmox.proxmox.client.ProxmoxClient.list_machines, returns=VMS_INFO),
# get_machine_pool_info
AutoSpecMethodInfo(
uds.services.Proxmox.proxmox.client.ProxmoxClient.get_machine_pool_info,
uds.services.Proxmox.proxmox.client.ProxmoxClient.get_task,
returns=lambda *args, **kwargs: TASK_STATUS, # pyright: ignore
),
# list_machines
AutoSpecMethodInfo(uds.services.Proxmox.proxmox.client.ProxmoxClient.list_vms, returns=VMS_INFO),
# get_vm_pool_info
AutoSpecMethodInfo(
uds.services.Proxmox.proxmox.client.ProxmoxClient.get_vm_pool_info,
returns=lambda vmid, poolid, **kwargs: VMS_INFO[vmid - 1], # pyright: ignore
),
# get_machine_info
AutoSpecMethodInfo(
uds.services.Proxmox.proxmox.client.ProxmoxClient.get_machine_info,
uds.services.Proxmox.proxmox.client.ProxmoxClient.get_vm_info,
returns=lambda vmid, *args, **kwargs: VMS_INFO[vmid - 1], # pyright: ignore
),
# get_machine_configuration
AutoSpecMethodInfo(
uds.services.Proxmox.proxmox.client.ProxmoxClient.get_machine_configuration,
uds.services.Proxmox.proxmox.client.ProxmoxClient.get_vm_config,
returns=lambda vmid, **kwargs: VMS_CONFIGURATION[vmid - 1], # pyright: ignore
),
# enable_machine_ha return None
# start_machine
AutoSpecMethodInfo(uds.services.Proxmox.proxmox.client.ProxmoxClient.start_machine, returns=replacer_vm_info(status='running')),
AutoSpecMethodInfo(
uds.services.Proxmox.proxmox.client.ProxmoxClient.start_vm, returns=replacer_vm_info(status='running')
),
# stop_machine
AutoSpecMethodInfo(uds.services.Proxmox.proxmox.client.ProxmoxClient.stop_machine, returns=replacer_vm_info(status='stopped')),
AutoSpecMethodInfo(
uds.services.Proxmox.proxmox.client.ProxmoxClient.stop_vm, returns=replacer_vm_info(status='stopped')
),
# reset_machine
AutoSpecMethodInfo(uds.services.Proxmox.proxmox.client.ProxmoxClient.reset_machine, returns=replacer_vm_info(status='stopped')),
AutoSpecMethodInfo(
uds.services.Proxmox.proxmox.client.ProxmoxClient.reset_vm, returns=replacer_vm_info(status='stopped')
),
# suspend_machine
AutoSpecMethodInfo(uds.services.Proxmox.proxmox.client.ProxmoxClient.suspend_machine, returns=replacer_vm_info(status='suspended')),
AutoSpecMethodInfo(
uds.services.Proxmox.proxmox.client.ProxmoxClient.suspend_vm,
returns=replacer_vm_info(status='suspended'),
),
# resume_machine
AutoSpecMethodInfo(uds.services.Proxmox.proxmox.client.ProxmoxClient.resume_machine, returns=replacer_vm_info(status='running')),
AutoSpecMethodInfo(
uds.services.Proxmox.proxmox.client.ProxmoxClient.resume_vm,
returns=replacer_vm_info(status='running'),
),
# shutdown_machine
AutoSpecMethodInfo(uds.services.Proxmox.proxmox.client.ProxmoxClient.shutdown_machine, returns=replacer_vm_info(status='stopped')),
AutoSpecMethodInfo(
uds.services.Proxmox.proxmox.client.ProxmoxClient.shutdown_vm,
returns=replacer_vm_info(status='stopped'),
),
# convert_to_template
AutoSpecMethodInfo(uds.services.Proxmox.proxmox.client.ProxmoxClient.convert_to_template, returns=replacer_vm_info(template=True)),
AutoSpecMethodInfo(
uds.services.Proxmox.proxmox.client.ProxmoxClient.convert_vm_to_template,
returns=replacer_vm_info(template=True),
),
# get_storage
AutoSpecMethodInfo(
uds.services.Proxmox.proxmox.client.ProxmoxClient.get_storage,
uds.services.Proxmox.proxmox.client.ProxmoxClient.get_storage_info,
returns=lambda storage, node, **kwargs: next( # pyright: ignore
filter(lambda s: s.storage == storage, STORAGES) # pyright: ignore
),
@ -394,12 +479,17 @@ CLIENT_METHODS_INFO: typing.Final[list[AutoSpecMethodInfo]] = [
),
),
# get_console_connection
AutoSpecMethodInfo(uds.services.Proxmox.proxmox.client.ProxmoxClient.get_console_connection, returns=CONSOLE_CONNECTION_INFO),
AutoSpecMethodInfo(
uds.services.Proxmox.proxmox.client.ProxmoxClient.get_console_connection,
returns=CONSOLE_CONNECTION_INFO,
),
# journal
AutoSpecMethodInfo(uds.services.Proxmox.proxmox.client.ProxmoxClient.journal, returns=['journal line 1', 'journal line 2']),
AutoSpecMethodInfo(
uds.services.Proxmox.proxmox.client.ProxmoxClient.journal, returns=['journal line 1', 'journal line 2']
),
]
PROVIDER_VALUES_DICT: typing.Final[gui.ValuesDictType] = {
PROVIDER_VALUES_DICT: gui.ValuesDictType = {
'host': 'host',
'port': 8006,
'username': 'username',
@ -412,7 +502,7 @@ PROVIDER_VALUES_DICT: typing.Final[gui.ValuesDictType] = {
}
SERVICE_LINKED_VALUES_DICT: typing.Final[gui.ValuesDictType] = {
SERVICE_LINKED_VALUES_DICT: gui.ValuesDictType = {
'pool': POOLS[0].poolid,
'ha': HA_GROUPS[0],
'try_soft_shutdown': False,
@ -447,10 +537,11 @@ def patched_provider(
) -> typing.Generator[provider.ProxmoxProvider, None, None]:
client = create_client_mock()
provider = create_provider(**kwargs)
with mock.patch.object(provider, '_api') as api:
with mock.patch.object(provider, 'api') as api:
api.return_value = client
yield provider
def create_provider(**kwargs: typing.Any) -> provider.ProxmoxProvider:
"""
Create a provider
@ -564,5 +655,5 @@ def create_userservice_linked(
# Other helpers
def set_all_vm_state(state: str) -> None:
# Set machine state for fixture to stopped
for i in range(len(VMS_INFO)):
VMS_INFO[i] = VMS_INFO[i]._replace(status=state)
for i in VMS_INFO:
i.status = state

View File

@ -0,0 +1,92 @@
# -*- coding: utf-8 -*-
#
# Copyright (c) 2024 Virtual Cable S.L.U.
# All rights reserved.
#
# Redistribution and use in source and binary forms, with or without modification,
# are permitted provided that the following conditions are met:
#
# * Redistributions of source code must retain the above copyright notice,
# this list of conditions and the following disclaimer.
# * Redistributions in binary form must reproduce the above copyright notice,
# this list of conditions and the following disclaimer in the documentation
# and/or other materials provided with the distribution.
# * Neither the name of Virtual Cable S.L.U. nor the names of its contributors
# may be used to endorse or promote products derived from this software
# without specific prior written permission.
#
# THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS IS"
# AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE
# IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE ARE
# DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT HOLDER OR CONTRIBUTORS BE LIABLE
# FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR CONSEQUENTIAL
# DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR
# SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER
# CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY,
# OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE
# OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
"""
Author: Adolfo Gómez, dkmaster at dkmon dot com
"""
import typing
from unittest import mock
from uds.services.Proxmox import helpers
from . import fixtures
from ...utils.test import UDSTransactionTestCase
class TestProxmoxHelpers(UDSTransactionTestCase):
_parameters: dict[str, typing.Any] = {
'prov_uuid': 'test',
'machine': fixtures.VMS_INFO[0].vmid, # Used on get_storage
'pool': fixtures.POOLS[0].poolid, # Used on get_machines
}
def test_get_provider(self) -> None:
# with fixtures.patched_provider() as provider:
# pass
with mock.patch('uds.models.Provider.objects.get') as get_provider:
helpers.get_provider(self._parameters)
get_provider.assert_called_once_with(uuid=self._parameters['prov_uuid'])
def test_get_storage(self) -> None:
with fixtures.patched_provider() as provider:
with mock.patch('uds.models.Provider.objects.get') as get_provider:
api = typing.cast(mock.Mock, provider.api())
get_provider.return_value.get_instance.return_value = provider
result = helpers.get_storage(self._parameters)
self.assertEqual(len(result), 1)
self.assertEqual(result[0]['name'], 'datastore')
choices = result[0]['choices']
self.assertIsInstance(choices, list)
self.assertGreaterEqual(len(choices), 1)
for choice in choices:
self.assertIsInstance(choice, dict)
self.assertIsInstance(choice['id'], str)
self.assertIsInstance(choice['text'], str)
api.get_vm_pool_info.assert_called_once()
api.list_storages.assert_called_once()
def test_get_machines(self) -> None:
with fixtures.patched_provider() as provider:
with mock.patch('uds.models.Provider.objects.get') as get_provider:
api = typing.cast(mock.Mock, provider.api())
get_provider.return_value.get_instance.return_value = provider
result = helpers.get_machines(self._parameters)
self.assertEqual(len(result), 1)
self.assertEqual(result[0]['name'], 'machines')
choices = result[0]['choices']
self.assertIsInstance(choices, list)
self.assertGreaterEqual(len(choices), 1)
for choice in choices:
self.assertIsInstance(choice, dict)
self.assertIsInstance(choice['id'], str)
self.assertIsInstance(choice['text'], str)
api.get_pool_info.assert_called_once()

View File

@ -79,7 +79,7 @@ class TestProxmoxProvider(UDSTransactionTestCase):
Test the provider
"""
with fixtures.patched_provider() as provider:
api = typing.cast(mock.MagicMock, provider._api())
api = typing.cast(mock.MagicMock, provider.api())
for ret_val in [True, False]:
api.test.reset_mock()
# Mock test_connection to return ret_val
@ -106,7 +106,7 @@ class TestProxmoxProvider(UDSTransactionTestCase):
Thi is "specieal" because it uses cache
"""
with fixtures.patched_provider() as provider:
api = typing.cast(mock.MagicMock, provider._api())
api = typing.cast(mock.MagicMock, provider.api())
# Fist, true result
self.assertEqual(provider.is_available(), True)
@ -128,28 +128,28 @@ class TestProxmoxProvider(UDSTransactionTestCase):
Test the provider methods
"""
with fixtures.patched_provider() as provider:
api = typing.cast(mock.MagicMock, provider._api())
api = typing.cast(mock.MagicMock, provider.api())
self.assertEqual(provider.test_connection(), True)
api.test.assert_called_once_with()
self.assertEqual(provider.list_machines(force=True), fixtures.VMS_INFO)
api.list_machines.assert_called_once_with(force=True)
api.list_machines.reset_mock()
self.assertEqual(provider.list_machines(), fixtures.VMS_INFO)
api.list_machines.assert_called_once_with(force=False)
self.assertEqual(provider.list_vms(force=True), fixtures.VMS_INFO)
api.list_vms.assert_called_once_with(force=True)
api.list_vms.reset_mock()
self.assertEqual(provider.list_vms(), fixtures.VMS_INFO)
api.list_vms.assert_called_once_with(force=False)
self.assertEqual(provider.get_vm_info(1), fixtures.VMS_INFO[0])
api.get_machine_pool_info.assert_called_once_with(1, None, force=True)
api.get_vm_pool_info.assert_called_once_with(1, None, force=True)
self.assertEqual(provider.get_vm_config(1), fixtures.VMS_CONFIGURATION[0])
api.get_machine_configuration.assert_called_once_with(1, force=True)
api.get_vm_config.assert_called_once_with(1, force=True)
self.assertEqual(
provider.get_storage_info(fixtures.STORAGES[2].storage, fixtures.STORAGES[2].node, force=True),
fixtures.STORAGES[2],
)
api.get_storage.assert_called_once_with(
api.get_storage_info.assert_called_once_with(
fixtures.STORAGES[2].storage, fixtures.STORAGES[2].node, force=True
)
@ -158,13 +158,13 @@ class TestProxmoxProvider(UDSTransactionTestCase):
Test the provider methods
"""
with fixtures.patched_provider() as provider:
api = typing.cast(mock.MagicMock, provider._api())
api = typing.cast(mock.MagicMock, provider.api())
self.assertEqual(
provider.get_storage_info(fixtures.STORAGES[2].storage, fixtures.STORAGES[2].node),
fixtures.STORAGES[2],
)
api.get_storage.assert_called_once_with(
api.get_storage_info.assert_called_once_with(
fixtures.STORAGES[2].storage, fixtures.STORAGES[2].node, force=False
)
@ -190,7 +190,7 @@ class TestProxmoxProvider(UDSTransactionTestCase):
Test the provider methods
"""
with fixtures.patched_provider() as provider:
api = typing.cast(mock.MagicMock, provider._api())
api = typing.cast(mock.MagicMock, provider.api())
self.assertEqual(
provider.get_pool_info(fixtures.POOLS[2].poolid, retrieve_vm_names=True, force=True),
@ -206,55 +206,55 @@ class TestProxmoxProvider(UDSTransactionTestCase):
)
provider.create_template(1)
api.convert_to_template.assert_called_once_with(1)
api.convert_vm_to_template.assert_called_once_with(1)
self.assertEqual(
provider.clone_vm(1, 'name', 'description', True, 'node', 'storage', 'pool', True),
fixtures.VM_CREATION_RESULT,
)
api.clone_machine.assert_called_once_with(
api.clone_vm.assert_called_once_with(
1, mock.ANY, 'name', 'description', True, 'node', 'storage', 'pool', True
)
self.assertEqual(provider.start_machine(1), fixtures.UPID)
api.start_machine.assert_called_once_with(1)
api.start_vm.assert_called_once_with(1)
self.assertEqual(provider.stop_machine(1), fixtures.UPID)
api.stop_machine.assert_called_once_with(1)
api.stop_vm.assert_called_once_with(1)
self.assertEqual(provider.reset_machine(1), fixtures.UPID)
api.reset_machine.assert_called_once_with(1)
api.reset_vm.assert_called_once_with(1)
self.assertEqual(provider.suspend_machine(1), fixtures.UPID)
api.suspend_machine.assert_called_once_with(1)
api.suspend_vm.assert_called_once_with(1)
def test_provider_methods_4(self) -> None:
"""
Test the provider methods
"""
with fixtures.patched_provider() as provider:
api = typing.cast(mock.MagicMock, provider._api())
api = typing.cast(mock.MagicMock, provider.api())
self.assertEqual(provider.shutdown_machine(1), fixtures.UPID)
api.shutdown_machine.assert_called_once_with(1)
api.shutdown_vm.assert_called_once_with(1)
self.assertEqual(provider.remove_machine(1), fixtures.UPID)
api.remove_machine.assert_called_once_with(1)
self.assertEqual(provider.delete_vm(1), fixtures.UPID)
api.delete_vm.assert_called_once_with(1)
self.assertEqual(provider.get_task_info('node', 'upid'), fixtures.TASK_STATUS)
api.get_task.assert_called_once_with('node', 'upid')
provider.enable_machine_ha(1, True, 'group')
api.enable_machine_ha.assert_called_once_with(1, True, 'group')
api.enable_vm_ha.assert_called_once_with(1, True, 'group')
provider.set_machine_mac(1, 'mac')
api.set_machine_mac.assert_called_once_with(1, 'mac')
api.set_vm_net_mac.assert_called_once_with(1, 'mac')
provider.disable_machine_ha(1)
api.disable_machine_ha.assert_called_once_with(1)
api.disable_vm_ha.assert_called_once_with(1)
provider.set_protection(1, 'node', True)
api.set_protection.assert_called_once_with(1, 'node', True)
api.set_vm_protection.assert_called_once_with(1, 'node', True)
self.assertEqual(provider.list_ha_groups(), fixtures.HA_GROUPS)
api.list_ha_groups.assert_called_once_with()
@ -264,7 +264,7 @@ class TestProxmoxProvider(UDSTransactionTestCase):
Test the provider methods
"""
with fixtures.patched_provider() as provider:
api = typing.cast(mock.MagicMock, provider._api())
api = typing.cast(mock.MagicMock, provider.api())
self.assertEqual(provider.get_console_connection('1'), fixtures.CONSOLE_CONNECTION_INFO)
api.get_console_connection.assert_called_once_with(1, None)
@ -288,37 +288,3 @@ class TestProxmoxProvider(UDSTransactionTestCase):
provider.restore_snapshot(1, 'node', 'name')
api.restore_snapshot.assert_called_once_with(1, 'node', 'name')
def test_helpers(self) -> None:
"""
Test the provider helpers
"""
from uds.services.Proxmox.helpers import get_storage, get_machines
with fixtures.patched_provider() as provider:
# Patch get_provider to return te ProxmoxProvider instance (provider)
with mock.patch('uds.services.Proxmox.helpers.get_provider', return_value=provider):
# Test get_storage
vm_info = provider.get_vm_info(1)
h_storage = get_storage({'prov_uuid': 'test', 'machine': '1'})
self.assertEqual(
list(
map(
lambda x: x['id'],
h_storage[0]['choices'],
)
),
list(map(lambda x: x.storage, filter(lambda x: x.node == vm_info.node, fixtures.STORAGES))),
)
h_machines = get_machines({'prov_uuid': 'test', 'pool': fixtures.POOLS[0].poolid})
# Test get_machines
self.assertEqual(
list(
map(
lambda x: x['id'],
h_machines[0]['choices'],
)
),
list(map(lambda x: str(x.vmid), fixtures.POOLS[0].members)),
)

View File

@ -44,10 +44,12 @@ from ...utils.helpers import limited_iterator
# USe transactional, used by publication access to db on "removal"
class TestProxmoxPublication(UDSTransactionTestCase):
def setUp(self) -> None:
fixtures.clear()
def test_publication(self) -> None:
with fixtures.patched_provider() as provider:
api = typing.cast(mock.MagicMock, provider._api())
api = typing.cast(mock.MagicMock, provider.api())
service = fixtures.create_service_linked(provider=provider)
publication = fixtures.create_publication(service=service)
@ -57,7 +59,7 @@ class TestProxmoxPublication(UDSTransactionTestCase):
state = publication.check_state()
self.assertEqual(state, types.states.State.RUNNING)
api.clone_machine.assert_called_once_with(
api.clone_vm.assert_called_once_with(
publication.service().machine.as_int(),
MustBeOfType(int),
MustBeOfType(str),
@ -76,14 +78,13 @@ class TestProxmoxPublication(UDSTransactionTestCase):
def test_publication_error(self) -> None:
with fixtures.patched_provider() as provider:
api = typing.cast(mock.MagicMock, provider._api())
api = typing.cast(mock.MagicMock, provider.api())
service = fixtures.create_service_linked(provider=provider)
publication = fixtures.create_publication(service=service)
# Ensure state check returns error
api.get_task.return_value = fixtures.TASK_STATUS._replace(
status='stopped', exitstatus='ERROR, BOOM!'
)
fixtures.TASK_STATUS.status = 'stopped'
fixtures.TASK_STATUS.exitstatus = 'ERROR, BOOM!'
state = publication.publish()
self.assertEqual(state, types.states.State.RUNNING, f'State is not running: publication._queue={publication._queue}')
@ -93,7 +94,7 @@ class TestProxmoxPublication(UDSTransactionTestCase):
state = publication.check_state()
try:
api.clone_machine.assert_called_once_with(
api.clone_vm.assert_called_once_with(
publication.service().machine.as_int(),
MustBeOfType(int),
MustBeOfType(str),
@ -113,7 +114,7 @@ class TestProxmoxPublication(UDSTransactionTestCase):
def test_publication_destroy(self) -> None:
vmid = str(fixtures.VMS_INFO[0].vmid)
with fixtures.patched_provider() as provider:
api = typing.cast(mock.MagicMock, provider._api())
api = typing.cast(mock.MagicMock, provider.api())
service = fixtures.create_service_linked(provider=provider)
publication = fixtures.create_publication(service=service)
@ -123,12 +124,12 @@ class TestProxmoxPublication(UDSTransactionTestCase):
publication._vmid = vmid
state = publication.destroy()
self.assertEqual(state, types.states.State.RUNNING)
api.remove_machine.assert_called_once_with(publication.machine())
api.delete_vm.assert_called_once_with(publication.machine())
# Now, destroy again, should do nothing more
state = publication.destroy()
# Should not call again
api.remove_machine.assert_called_once_with(publication.machine())
api.delete_vm.assert_called_once_with(publication.machine())
self.assertEqual(state, types.states.State.RUNNING)
@ -136,17 +137,17 @@ class TestProxmoxPublication(UDSTransactionTestCase):
def test_publication_destroy_error(self) -> None:
vmid = str(fixtures.VMS_INFO[0].vmid)
with fixtures.patched_provider() as provider:
api = typing.cast(mock.MagicMock, provider._api())
api = typing.cast(mock.MagicMock, provider.api())
service = fixtures.create_service_linked(provider=provider)
publication = fixtures.create_publication(service=service)
# Now, destroy in fact will not return error, because it will
# queue the operation if failed, but api.remove_machine will be called anyway
# queue the operation if failed, but api.delete_vm will be called anyway
publication._vmid = vmid
api.remove_machine.side_effect = Exception('BOOM!')
api.delete_vm.side_effect = Exception('BOOM!')
publication._vmid = vmid
self.assertEqual(publication.destroy(), types.states.State.RUNNING)
api.remove_machine.assert_called_once_with(publication.machine())
api.delete_vm.assert_called_once_with(publication.machine())
# Ensure cancel calls destroy
with mock.patch.object(publication, 'destroy') as destroy:

View File

@ -46,7 +46,7 @@ class TestProxmoxFixedService(UDSTransactionTestCase):
Test the provider
"""
with fixtures.patched_provider() as provider:
api = typing.cast(mock.MagicMock, provider._api())
api = typing.cast(mock.MagicMock, provider.api())
service = fixtures.create_service_fixed(provider=provider)
self.assertTrue(service.is_avaliable())
@ -136,7 +136,7 @@ class TestProxmoxFixedService(UDSTransactionTestCase):
def test_process_snapshot(self) -> None:
with fixtures.patched_provider() as provider:
api = typing.cast(mock.MagicMock, provider._api())
api = typing.cast(mock.MagicMock, provider.api())
service = fixtures.create_service_fixed(provider=provider)
vmid = typing.cast(list[str], fixtures.SERVICE_FIXED_VALUES_DICT['machines'])[0]

View File

@ -57,7 +57,7 @@ class TestProxmovLinkedService(UDSTestCase):
Test the provider
"""
with fixtures.patched_provider() as provider:
api = typing.cast(mock.MagicMock, provider._api())
api = typing.cast(mock.MagicMock, provider.api())
service = fixtures.create_service_linked(provider=provider)
self.assertTrue(service.is_avaliable())
@ -75,15 +75,15 @@ class TestProxmovLinkedService(UDSTestCase):
def test_service_methods_1(self) -> None:
with fixtures.patched_provider() as provider:
api = typing.cast(mock.MagicMock, provider._api())
api = typing.cast(mock.MagicMock, provider.api())
service = fixtures.create_service_linked(provider=provider)
# Sanitized name
self.assertEqual(service.sanitized_name('a.b.c$m1%233 2'), 'a-b-c-m1-233-2')
# Clone machine
self.assertEqual(service.clone_machine('name', 'description', 1), fixtures.VM_CREATION_RESULT)
api.clone_machine.assert_called_with(
self.assertEqual(service.clone_vm('name', 'description', 1), fixtures.VM_CREATION_RESULT)
api.clone_vm.assert_called_with(
1,
mock.ANY,
'name',
@ -95,8 +95,8 @@ class TestProxmovLinkedService(UDSTestCase):
None,
)
# Clone machine, for template
self.assertEqual(service.clone_machine('name', 'description'), fixtures.VM_CREATION_RESULT)
api.clone_machine.assert_called_with(
self.assertEqual(service.clone_vm('name', 'description'), fixtures.VM_CREATION_RESULT)
api.clone_vm.assert_called_with(
service.machine.as_int(),
mock.ANY,
'name',
@ -110,26 +110,26 @@ class TestProxmovLinkedService(UDSTestCase):
# Get machine info
self.assertEqual(service.get_vm_info(1), fixtures.VMS_INFO[0])
api.get_machine_pool_info.assert_called_with(1, service.pool.value, force=True)
api.get_vm_pool_info.assert_called_with(1, service.pool.value, force=True)
# Get nic mac
self.assertEqual(service.get_nic_mac(1), '00:01:02:03:04:05')
# remove machine, but this is from provider
self.assertEqual(service.provider().remove_machine(1), fixtures.UPID)
self.assertEqual(service.provider().delete_vm(1), fixtures.UPID)
# Enable HA
service.enable_vm_ha(1, True)
api.enable_machine_ha.assert_called_with(1, True, service.ha.value)
api.enable_vm_ha.assert_called_with(1, True, service.ha.value)
def test_service_methods_2(self) -> None:
with fixtures.patched_provider() as provider:
api = typing.cast(mock.MagicMock, provider._api())
api = typing.cast(mock.MagicMock, provider.api())
service = fixtures.create_service_linked(provider=provider)
# Disable HA
service.disable_vm_ha(1)
api.disable_machine_ha.assert_called_with(1)
api.disable_vm_ha.assert_called_with(1)
# Get basename
self.assertEqual(service.get_basename(), service.basename.value)

View File

@ -44,7 +44,7 @@ from ...utils.helpers import limited_iterator
# We use transactions on some related methods (storage access, etc...)
class TestProxmoxFixedUserService(UDSTransactionTestCase):
def setUp(self) -> None:
fixtures.set_all_vm_state('stopped')
fixtures.clear()
def test_userservice_fixed_user(self) -> None:
"""

View File

@ -46,14 +46,14 @@ from ...utils.helpers import limited_iterator
# We use transactions on some related methods (storage access, etc...)
class TestProxmoxLinkedUserService(UDSTransactionTestCase):
def setUp(self) -> None:
fixtures.set_all_vm_state('stopped')
fixtures.clear()
def test_userservice_linked_cache_l1(self) -> None:
"""
Test the user service
"""
with fixtures.patched_provider() as provider:
api = typing.cast(mock.MagicMock, provider._api())
api = typing.cast(mock.MagicMock, provider.api())
service = fixtures.create_service_linked(provider=provider)
userservice = fixtures.create_userservice_linked(service=service)
publication = userservice.publication()
@ -74,7 +74,7 @@ class TestProxmoxLinkedUserService(UDSTransactionTestCase):
vmid = int(userservice._vmid)
api.clone_machine.assert_called_with(
api.clone_vm.assert_called_with(
publication.machine(),
mock.ANY,
userservice._name,
@ -89,18 +89,18 @@ class TestProxmoxLinkedUserService(UDSTransactionTestCase):
# api.get_task should have been invoked at least once
self.assertTrue(api.get_task.called)
api.enable_machine_ha.assert_called()
api.enable_vm_ha.assert_called()
api.set_machine_mac.assert_called_with(vmid, userservice._mac)
api.get_machine_pool_info.assert_called_with(vmid, service.pool.value, force=True)
api.start_machine.assert_called_with(vmid)
api.set_vm_net_mac.assert_called_with(vmid, userservice._mac)
api.get_vm_pool_info.assert_called_with(vmid, service.pool.value, force=True)
api.start_vm.assert_called_with(vmid)
def test_userservice_linked_cache_l2_no_ha(self) -> None:
"""
Test the user service
"""
with fixtures.patched_provider() as provider:
api = typing.cast(mock.MagicMock, provider._api())
api = typing.cast(mock.MagicMock, provider.api())
service = fixtures.create_service_linked(provider=provider)
userservice = fixtures.create_userservice_linked(service=service)
service.ha.value = '__' # Disabled
@ -126,7 +126,7 @@ class TestProxmoxLinkedUserService(UDSTransactionTestCase):
vmid = int(userservice._vmid)
api.clone_machine.assert_called_with(
api.clone_vm.assert_called_with(
publication.machine(),
mock.ANY,
userservice._name,
@ -142,21 +142,21 @@ class TestProxmoxLinkedUserService(UDSTransactionTestCase):
self.assertTrue(api.get_task.called)
# Shoud not have been called since HA is disabled
api.enable_machine_ha.assert_not_called()
api.enable_vm_ha.assert_not_called()
api.set_machine_mac.assert_called_with(vmid, userservice._mac)
api.get_machine_pool_info.assert_called_with(vmid, service.pool.value, force=True)
api.set_vm_net_mac.assert_called_with(vmid, userservice._mac)
api.get_vm_pool_info.assert_called_with(vmid, service.pool.value, force=True)
# Now, start should have been called
api.start_machine.assert_called_with(vmid)
api.start_vm.assert_called_with(vmid)
# Stop machine should have been called
api.shutdown_machine.assert_called_with(vmid)
api.shutdown_vm.assert_called_with(vmid)
def test_userservice_linked_user(self) -> None:
"""
Test the user service
"""
with fixtures.patched_provider() as provider:
api = typing.cast(mock.MagicMock, provider._api())
api = typing.cast(mock.MagicMock, provider.api())
service = fixtures.create_service_linked(provider=provider)
userservice = fixtures.create_userservice_linked(service=service)
@ -181,7 +181,7 @@ class TestProxmoxLinkedUserService(UDSTransactionTestCase):
vmid = int(userservice._vmid)
api.clone_machine.assert_called_with(
api.clone_vm.assert_called_with(
publication.machine(),
mock.ANY,
userservice._name,
@ -196,11 +196,11 @@ class TestProxmoxLinkedUserService(UDSTransactionTestCase):
# api.get_task should have been invoked at least once
self.assertTrue(api.get_task.called)
api.enable_machine_ha.assert_called()
api.enable_vm_ha.assert_called()
api.set_machine_mac.assert_called_with(vmid, userservice._mac)
api.get_machine_pool_info.assert_called_with(vmid, service.pool.value, force=True)
api.start_machine.assert_called_with(vmid)
api.set_vm_net_mac.assert_called_with(vmid, userservice._mac)
api.get_vm_pool_info.assert_called_with(vmid, service.pool.value, force=True)
api.start_vm.assert_called_with(vmid)
# Ensure vm is stopped, because deployment should have started it (as api.start_machine was called)
fixtures.replace_vm_info(vmid, status='stopped')
@ -220,7 +220,7 @@ class TestProxmoxLinkedUserService(UDSTransactionTestCase):
Test the user service
"""
with fixtures.patched_provider() as provider:
api = typing.cast(mock.MagicMock, provider._api())
api = typing.cast(mock.MagicMock, provider.api())
for graceful in [True, False]:
service = fixtures.create_service_linked(provider=provider)
userservice = fixtures.create_userservice_linked(service=service)
@ -231,9 +231,8 @@ class TestProxmoxLinkedUserService(UDSTransactionTestCase):
service.must_stop_before_deletion = False # Avoid stopping before deletion, not needed for this test
# Set machine state for fixture to started
fixtures.VMS_INFO = [
fixtures.VMS_INFO[i]._replace(status='running') for i in range(len(fixtures.VMS_INFO))
]
for vminfo in fixtures.VMS_INFO:
vminfo.status = 'running'
state = userservice.deploy_for_user(models.User())
@ -280,17 +279,15 @@ class TestProxmoxLinkedUserService(UDSTransactionTestCase):
state = userservice.check_state()
if counter > 5:
# Set machine state for fixture to stopped
fixtures.VMS_INFO = [
fixtures.VMS_INFO[i]._replace(status='stopped')
for i in range(len(fixtures.VMS_INFO))
]
for vminfo in fixtures.VMS_INFO:
vminfo.status = 'stopped'
self.assertEqual(state, types.states.TaskState.FINISHED)
self.assertEqual(state, types.states.TaskState.FINISHED, f'Extra info: {userservice._error_debug_info} {userservice._reason} {userservice._queue}')
if graceful:
api.shutdown_machine.assert_called()
api.shutdown_vm.assert_called()
else:
api.stop_machine.assert_called()
api.stop_vm.assert_called()
def test_userservice_basics(self) -> None:
with fixtures.patched_provider():

View File

@ -41,6 +41,11 @@ def hash_key(key: typing.Union[str, bytes]) -> str:
"""
Returns a hash of the given key
Return value should be, at most, 64 bytes long (as db field is 64 bytes long)
Currently used at least on:
* src/uds/core/util/cache.py
Note that replacing the algorithm used here will force to invalidate all previous generated
entries. In the case of cache, this is not a problem, but in other cases, it could be.
"""
if isinstance(key, str):
key = key.encode('utf-8')

View File

@ -190,7 +190,7 @@ class ProxmoxUserserviceLinked(DynamicUserService):
)
comments = 'UDS Linked clone'
task_result = self.service().clone_machine(name, comments, template_id)
task_result = self.service().clone_vm(name, comments, template_id)
self._store_task(task_result.upid)
self._vmid = str(task_result.vmid)

View File

@ -50,7 +50,7 @@ def get_storage(parameters: typing.Any) -> types.ui.CallbackResultType:
logger.debug('Parameters received by getResources Helper: %s', parameters)
provider = get_provider(parameters)
# Obtains datacenter from cluster
# Obtains machine info, to obtain the node and get the storages
try:
vm_info = provider.get_vm_info(int(parameters['machine']))
except Exception:

View File

@ -143,7 +143,7 @@ class ProxmoxDeferredRemoval(jobs.Job):
return
if vmInfo.status == 'stopped': # Machine exists, try to remove it now
ProxmoxDeferredRemoval.waitForTaskFinish(instance, instance.remove_machine(vmid))
ProxmoxDeferredRemoval.waitForTaskFinish(instance, instance.delete_vm(vmid))
# It this is reached, remove check
storage.remove('tr' + str(vmid))

View File

@ -122,7 +122,7 @@ class ProxmoxProvider(services.ServiceProvider):
_cached_api: typing.Optional[client.ProxmoxClient] = None
_vmid_generator: UniqueIDGenerator
def _api(self) -> client.ProxmoxClient:
def api(self) -> client.ProxmoxClient:
"""
Returns the connection API object
"""
@ -164,35 +164,35 @@ class ProxmoxProvider(services.ServiceProvider):
True if all went fine, false if id didn't
"""
return self._api().test()
return self.api().test()
def list_machines(self, force: bool = False) -> list[prox_types.VMInfo]:
return self._api().list_machines(force=force)
def list_vms(self, force: bool = False) -> list[prox_types.VMInfo]:
return self.api().list_vms(force=force)
def get_vm_info(self, vmid: int, poolid: typing.Optional[str] = None) -> prox_types.VMInfo:
return self._api().get_machine_pool_info(vmid, poolid, force=True)
return self.api().get_vm_pool_info(vmid, poolid, force=True)
def get_vm_config(self, vmid: int) -> prox_types.VMConfiguration:
return self._api().get_machine_configuration(vmid, force=True)
return self.api().get_vm_config(vmid, force=True)
def get_storage_info(self, storageid: str, node: str, force: bool = False) -> prox_types.StorageInfo:
return self._api().get_storage(storageid, node, force=force)
return self.api().get_storage_info(storageid, node, force=force)
def list_storages(
self, node: typing.Optional[str] = None, force: bool = False
) -> list[prox_types.StorageInfo]:
return self._api().list_storages(node=node, content='images', force=force)
return self.api().list_storages(node=node, content='images', force=force)
def list_pools(self, force: bool = False) -> list[prox_types.PoolInfo]:
return self._api().list_pools(force=force)
return self.api().list_pools(force=force)
def get_pool_info(
self, pool_id: str, retrieve_vm_names: bool = False, force: bool = False
) -> prox_types.PoolInfo:
return self._api().get_pool_info(pool_id, retrieve_vm_names=retrieve_vm_names, force=force)
return self.api().get_pool_info(pool_id, retrieve_vm_names=retrieve_vm_names, force=force)
def create_template(self, vmid: int) -> None:
self._api().convert_to_template(vmid)
self.api().convert_vm_to_template(vmid)
def clone_vm(
self,
@ -205,7 +205,7 @@ class ProxmoxProvider(services.ServiceProvider):
target_pool: typing.Optional[str] = None,
must_have_vgpus: typing.Optional[bool] = None,
) -> prox_types.VmCreationResult:
return self._api().clone_machine(
return self.api().clone_vm(
vmid,
self.get_new_vmid(),
name,
@ -218,54 +218,54 @@ class ProxmoxProvider(services.ServiceProvider):
)
def start_machine(self, vmid: int) -> prox_types.UPID:
return self._api().start_machine(vmid)
return self.api().start_vm(vmid)
def stop_machine(self, vmid: int) -> prox_types.UPID:
return self._api().stop_machine(vmid)
return self.api().stop_vm(vmid)
def reset_machine(self, vmid: int) -> prox_types.UPID:
return self._api().reset_machine(vmid)
return self.api().reset_vm(vmid)
def suspend_machine(self, vmId: int) -> prox_types.UPID:
return self._api().suspend_machine(vmId)
return self.api().suspend_vm(vmId)
def shutdown_machine(self, vmid: int) -> prox_types.UPID:
return self._api().shutdown_machine(vmid)
return self.api().shutdown_vm(vmid)
def remove_machine(self, vmid: int) -> prox_types.UPID:
return self._api().remove_machine(vmid)
def delete_vm(self, vmid: int) -> prox_types.UPID:
return self.api().delete_vm(vmid)
def get_task_info(self, node: str, upid: str) -> prox_types.TaskStatus:
return self._api().get_task(node, upid)
return self.api().get_task(node, upid)
def enable_machine_ha(self, vmid: int, started: bool = False, group: typing.Optional[str] = None) -> None:
self._api().enable_machine_ha(vmid, started, group)
self.api().enable_vm_ha(vmid, started, group)
def set_machine_mac(self, vmid: int, macAddress: str) -> None:
self._api().set_machine_mac(vmid, macAddress)
self.api().set_vm_net_mac(vmid, macAddress)
def disable_machine_ha(self, vmid: int) -> None:
self._api().disable_machine_ha(vmid)
self.api().disable_vm_ha(vmid)
def set_protection(self, vmid: int, node: typing.Optional[str] = None, protection: bool = False) -> None:
self._api().set_protection(vmid, node, protection)
self.api().set_vm_protection(vmid, node, protection)
def list_ha_groups(self) -> list[str]:
return self._api().list_ha_groups()
return self.api().list_ha_groups()
def get_console_connection(
self,
vmid: str,
node: typing.Optional[str] = None,
) -> typing.Optional[types.services.ConsoleConnectionInfo]:
return self._api().get_console_connection(int(vmid), node)
return self.api().get_console_connection(int(vmid), node)
def get_new_vmid(self) -> int:
MAX_RETRIES: typing.Final[int] = 512 # So we don't loop forever, just in case...
vmid = 0
for _ in range(MAX_RETRIES):
vmid = self._vmid_generator.get(self.start_vmid.as_int(), MAX_VMID)
if self._api().is_vmid_available(vmid):
if self.api().is_vmid_available(vmid):
return vmid
# All assigned vmid will be left as unusable on UDS until released by time (3 years)
# This is not a problem at all, in the rare case that a machine id is released from uds db
@ -273,17 +273,17 @@ class ProxmoxProvider(services.ServiceProvider):
raise prox_exceptions.ProxmoxError(f'Could not get a new vmid!!: last tried {vmid}')
def get_guest_ip_address(self, vmid: int, node: typing.Optional[str] = None, ip_version: typing.Literal['4', '6', ''] = '') -> str:
return self._api().get_guest_ip_address(vmid, node, ip_version)
return self.api().get_guest_ip_address(vmid, node, ip_version)
def supports_snapshot(self, vmid: int, node: typing.Optional[str] = None) -> bool:
return self._api().supports_snapshot(vmid, node)
return self.api().supports_snapshot(vmid, node)
def get_current_snapshot(
self, vmid: int, node: typing.Optional[str] = None
) -> typing.Optional[prox_types.SnapshotInfo]:
return (
sorted(
filter(lambda x: x.snaptime, self._api().list_snapshots(vmid, node)),
filter(lambda x: x.snaptime, self.api().list_snapshots(vmid, node)),
key=lambda x: x.snaptime or 0,
reverse=True,
)
@ -297,7 +297,7 @@ class ProxmoxProvider(services.ServiceProvider):
name: typing.Optional[str] = None,
description: typing.Optional[str] = None,
) -> prox_types.UPID:
return self._api().create_snapshot(vmid, node, name, description)
return self.api().create_snapshot(vmid, node, name, description)
def restore_snapshot(
self, vmid: int, node: typing.Optional[str] = None, name: typing.Optional[str] = None
@ -305,11 +305,11 @@ class ProxmoxProvider(services.ServiceProvider):
"""
In fact snapshot is not optional, but node is and want to keep the same signature as the api
"""
return self._api().restore_snapshot(vmid, node, name)
return self.api().restore_snapshot(vmid, node, name)
@cached('reachable', consts.cache.SHORT_CACHE_TIMEOUT, key_helper=cache_key_helper)
def is_available(self) -> bool:
return self._api().test()
return self.api().test()
def get_macs_range(self) -> str:
return self.macs_range.value

View File

@ -282,7 +282,7 @@ class ProxmoxClient:
)
@ensure_connected
def get_best_node_for_machine(
def get_best_node_for_vm(
self,
min_memory: int = 0,
must_have_vgpus: typing.Optional[bool] = None,
@ -320,7 +320,7 @@ class ProxmoxClient:
return best if best.status == 'online' else None
@ensure_connected
def clone_machine(
def clone_vm(
self,
vmid: int,
new_vmid: int,
@ -332,15 +332,15 @@ class ProxmoxClient:
use_pool: typing.Optional[str] = None,
must_have_vgpus: typing.Optional[bool] = None,
) -> types.VmCreationResult:
vmInfo = self.get_machine_info(vmid)
vmInfo = self.get_vm_info(vmid)
src_node = vmInfo.node
if not use_node:
logger.debug('Selecting best node')
# If storage is not shared, must be done on same as origin
if use_storage and self.get_storage(use_storage, vmInfo.node).shared:
node = self.get_best_node_for_machine(
if use_storage and self.get_storage_info(use_storage, vmInfo.node).shared:
node = self.get_best_node_for_vm(
min_memory=-1, must_have_vgpus=must_have_vgpus, mdev_type=vmInfo.vgpu_type
)
if node is None:
@ -399,7 +399,7 @@ class ProxmoxClient:
return [g['group'] for g in self._get('cluster/ha/groups')['data']]
@ensure_connected
def enable_machine_ha(self, vmid: int, started: bool = False, group: typing.Optional[str] = None) -> None:
def enable_vm_ha(self, vmid: int, started: bool = False, group: typing.Optional[str] = None) -> None:
"""
Enable high availability for a virtual machine.
@ -421,18 +421,18 @@ class ProxmoxClient:
)
@ensure_connected
def disable_machine_ha(self, vmid: int) -> None:
def disable_vm_ha(self, vmid: int) -> None:
try:
self._delete(f'cluster/ha/resources/vm%3A{vmid}')
except Exception:
logger.exception('removeFromHA')
@ensure_connected
def set_protection(self, vmid: int, node: typing.Optional[str] = None, protection: bool = False) -> None:
def set_vm_protection(self, vmid: int, node: typing.Optional[str] = None, protection: bool = False) -> None:
params: list[tuple[str, str]] = [
('protection', str(int(protection))),
]
node = node or self.get_machine_info(vmid).node
node = node or self.get_vm_info(vmid).node
self._post(f'nodes/{node}/qemu/{vmid}/config', data=params, node=node)
@ensure_connected
@ -441,7 +441,7 @@ class ProxmoxClient:
) -> str:
"""Returns the guest ip address of the specified machine"""
try:
node = node or self.get_machine_info(vmid).node
node = node or self.get_vm_info(vmid).node
ifaces_list: list[dict[str, typing.Any]] = self._get(
f'nodes/{node}/qemu/{vmid}/agent/network-get-interfaces',
node=node,
@ -464,13 +464,13 @@ class ProxmoxClient:
raise exceptions.ProxmoxError('No ip address found for vm {}'.format(vmid))
@ensure_connected
def remove_machine(self, vmid: int, node: typing.Optional[str] = None, purge: bool = True) -> types.UPID:
node = node or self.get_machine_info(vmid).node
def delete_vm(self, vmid: int, node: typing.Optional[str] = None, purge: bool = True) -> types.UPID:
node = node or self.get_vm_info(vmid).node
return types.UPID.from_dict(self._delete(f'nodes/{node}/qemu/{vmid}?purge=1', node=node))
@ensure_connected
def list_snapshots(self, vmid: int, node: typing.Optional[str] = None) -> list[types.SnapshotInfo]:
node = node or self.get_machine_info(vmid).node
node = node or self.get_vm_info(vmid).node
try:
return [
types.SnapshotInfo.from_dict(s)
@ -483,7 +483,7 @@ class ProxmoxClient:
@cached('snapshots', consts.CACHE_DURATION, key_helper=caching_key_helper)
def supports_snapshot(self, vmid: int, node: typing.Optional[str] = None) -> bool:
# If machine uses tpm, snapshots are not supported
return not self.get_machine_configuration(vmid, node).tpmstate0
return not self.get_vm_config(vmid, node).tpmstate0
@ensure_connected
def create_snapshot(
@ -496,7 +496,7 @@ class ProxmoxClient:
if self.supports_snapshot(vmid, node) is False:
raise exceptions.ProxmoxError('Machine does not support snapshots')
node = node or self.get_machine_info(vmid).node
node = node or self.get_vm_info(vmid).node
# Compose a sanitized name, without spaces and with a timestamp
name = name or f'UDS-{time.time()}'
params: list[tuple[str, str]] = [
@ -510,7 +510,7 @@ class ProxmoxClient:
def remove_snapshot(
self, vmid: int, node: 'str|None' = None, name: typing.Optional[str] = None
) -> types.UPID:
node = node or self.get_machine_info(vmid).node
node = node or self.get_vm_info(vmid).node
if name is None:
raise exceptions.ProxmoxError('Snapshot name is required')
return types.UPID.from_dict(self._delete(f'nodes/{node}/qemu/{vmid}/snapshot/{name}', node=node))
@ -519,7 +519,7 @@ class ProxmoxClient:
def restore_snapshot(
self, vmid: int, node: 'str|None' = None, name: typing.Optional[str] = None
) -> types.UPID:
node = node or self.get_machine_info(vmid).node
node = node or self.get_vm_info(vmid).node
if name is None:
raise exceptions.ProxmoxError('Snapshot name is required')
return types.UPID.from_dict(self._post(f'nodes/{node}/qemu/{vmid}/snapshot/{name}/rollback', node=node))
@ -532,7 +532,7 @@ class ProxmoxClient:
@cached('vms', consts.CACHE_DURATION, key_helper=caching_key_helper)
@ensure_connected
def list_machines(
def list_vms(
self, node: typing.Union[None, str, collections.abc.Iterable[str]] = None, **kwargs: typing.Any
) -> list[types.VMInfo]:
node_list: collections.abc.Iterable[str]
@ -553,7 +553,7 @@ class ProxmoxClient:
@cached('vmip', consts.CACHE_INFO_DURATION, key_helper=caching_key_helper)
@ensure_connected
def get_machine_pool_info(
def get_vm_pool_info(
self, vmid: int, poolid: typing.Optional[str], **kwargs: typing.Any
) -> types.VMInfo:
# try to locate machine in pool
@ -570,11 +570,11 @@ class ProxmoxClient:
except Exception: # nosec: # If pool is not present, just use default getVmInfo
pass
return self.get_machine_info(vmid, node, **kwargs)
return self.get_vm_info(vmid, node, **kwargs)
@ensure_connected
@cached('vmin', consts.CACHE_INFO_DURATION, key_helper=caching_key_helper)
def get_machine_info(
def get_vm_info(
self, vmid: int, node: typing.Optional[str] = None, **kwargs: typing.Any
) -> types.VMInfo:
nodes = [types.Node(node, False, False, 0, '', '', '')] if node else self.get_cluster_info().nodes
@ -597,21 +597,21 @@ class ProxmoxClient:
raise exceptions.ProxmoxNotFound(f'VM {vmid} not found')
@ensure_connected
def get_machine_configuration(
def get_vm_config(
self, vmid: int, node: typing.Optional[str] = None, **kwargs: typing.Any
) -> types.VMConfiguration:
node = node or self.get_machine_info(vmid).node
node = node or self.get_vm_info(vmid).node
return types.VMConfiguration.from_dict(self._get(f'nodes/{node}/qemu/{vmid}/config', node=node)['data'])
@ensure_connected
def set_machine_mac(
def set_vm_net_mac(
self,
vmid: int,
mac: str,
netid: typing.Optional[str] = None,
node: typing.Optional[str] = None,
) -> None:
node = node or self.get_machine_info(vmid).node
node = node or self.get_vm_info(vmid).node
# First, read current configuration and extract network configuration
config = self._get(f'nodes/{node}/qemu/{vmid}/config', node=node)['data']
if netid not in config:
@ -634,47 +634,49 @@ class ProxmoxClient:
)
@ensure_connected
def start_machine(self, vmid: int, node: typing.Optional[str] = None) -> types.UPID:
def start_vm(self, vmid: int, node: typing.Optional[str] = None) -> types.UPID:
# if exitstatus is "OK" or contains "already running", all is fine
node = node or self.get_machine_info(vmid).node
node = node or self.get_vm_info(vmid).node
return types.UPID.from_dict(self._post(f'nodes/{node}/qemu/{vmid}/status/start', node=node))
@ensure_connected
def stop_machine(self, vmid: int, node: typing.Optional[str] = None) -> types.UPID:
node = node or self.get_machine_info(vmid).node
def stop_vm(self, vmid: int, node: typing.Optional[str] = None) -> types.UPID:
node = node or self.get_vm_info(vmid).node
return types.UPID.from_dict(self._post(f'nodes/{node}/qemu/{vmid}/status/stop', node=node))
@ensure_connected
def reset_machine(self, vmid: int, node: typing.Optional[str] = None) -> types.UPID:
node = node or self.get_machine_info(vmid).node
def reset_vm(self, vmid: int, node: typing.Optional[str] = None) -> types.UPID:
node = node or self.get_vm_info(vmid).node
return types.UPID.from_dict(self._post(f'nodes/{node}/qemu/{vmid}/status/reset', node=node))
@ensure_connected
def suspend_machine(self, vmid: int, node: typing.Optional[str] = None) -> types.UPID:
def suspend_vm(self, vmid: int, node: typing.Optional[str] = None) -> types.UPID:
# Note: Suspend, in fact, invoques sets the machine state to "paused"
return self.shutdown_machine(vmid, node)
return self.shutdown_vm(vmid, node)
# node = node or self.get_machine_info(vmid).node
# return types.UPID.from_dict(self._post(f'nodes/{node}/qemu/{vmid}/status/suspend', node=node))
@ensure_connected
def shutdown_machine(self, vmid: int, node: typing.Optional[str] = None) -> types.UPID:
def shutdown_vm(self, vmid: int, node: typing.Optional[str] = None) -> types.UPID:
# if exitstatus is "OK" or contains "already running", all is fine
node = node or self.get_machine_info(vmid).node
node = node or self.get_vm_info(vmid).node
return types.UPID.from_dict(self._post(f'nodes/{node}/qemu/{vmid}/status/shutdown', node=node))
@ensure_connected
def convert_to_template(self, vmid: int, node: typing.Optional[str] = None) -> None:
node = node or self.get_machine_info(vmid).node
def convert_vm_to_template(self, vmid: int, node: typing.Optional[str] = None) -> None:
node = node or self.get_vm_info(vmid).node
self._post(f'nodes/{node}/qemu/{vmid}/template', node=node)
# Ensure cache is reset for this VM (as it is now a template)
self.get_machine_info(vmid, force=True)
self.get_vm_info(vmid, force=True)
# proxmox has a "resume", but start works for suspended vm so we use it
resume_machine = start_machine
@ensure_connected
def resume_vm(self, vmid: int, node: typing.Optional[str] = None) -> types.UPID:
return self.start_vm(vmid, node)
@ensure_connected
@cached('storage', consts.CACHE_DURATION, key_helper=caching_key_helper)
def get_storage(self, storage: str, node: str, **kwargs: typing.Any) -> types.StorageInfo:
def get_storage_info(self, storage: str, node: str, **kwargs: typing.Any) -> types.StorageInfo:
return types.StorageInfo.from_dict(
self._get(f'nodes/{node}/storage/{urllib.parse.quote(storage)}/status', node=node)['data']
)
@ -727,13 +729,9 @@ class ProxmoxClient:
if retrieve_vm_names:
for i in range(len(pool_info.members)):
try:
pool_info.members[i] = pool_info.members[i]._replace(
vmname=self.get_machine_info(pool_info.members[i].vmid).name or ''
)
pool_info.members[i].vmname = self.get_vm_info(pool_info.members[i].vmid).name or ''
except Exception:
pool_info.members[i] = pool_info.members[i]._replace(
vmname=f'VM-{pool_info.members[i].vmid}'
)
pool_info.members[i].vmname = f'VM-{pool_info.members[i].vmid}'
return pool_info
@ensure_connected
@ -743,7 +741,7 @@ class ProxmoxClient:
"""
Gets the connetion info for the specified machine
"""
node = node or self.get_machine_info(vmid).node
node = node or self.get_vm_info(vmid).node
res: dict[str, typing.Any] = self._post(f'nodes/{node}/qemu/{vmid}/spiceproxy', node=node)['data']
return core_types.services.ConsoleConnectionInfo(
type=res['type'],

View File

@ -1,45 +1,15 @@
import collections.abc
import dataclasses
import datetime
import re
import typing
import collections.abc
NETWORK_RE: typing.Final[typing.Pattern[str]] = re.compile(r'([a-zA-Z0-9]+)=([^,]+)') # May have vla id at end
# Conversor from dictionary to NamedTuple
CONVERSORS: typing.Final[dict[typing.Any, collections.abc.Callable[[typing.Type[typing.Any]], typing.Any]]] = {
str: lambda x: str(x or ''),
typing.Optional[str]: lambda x: str(x) if x is not None else None, # pyright: ignore
bool: lambda x: bool(x),
typing.Optional[bool]: lambda x: bool(x) if x is not None else None, # pyright: ignore
int: lambda x: int(x or '0'), # type: ignore
typing.Optional[int]: lambda x: int(x or '0') if x is not None else None, # type: ignore
float: lambda x: float(x or '0'), # type: ignore
typing.Optional[float]: lambda x: float(x or '0') if x is not None else None, # type: ignore
datetime.datetime: lambda x: datetime.datetime.fromtimestamp(int(x)), # type: ignore
typing.Optional[datetime.datetime]: lambda x: (
datetime.datetime.fromtimestamp(int(x)) if x is not None else None # type: ignore
),
}
def _from_dict(
type: type[typing.NamedTuple],
dictionary: collections.abc.MutableMapping[str, typing.Any],
extra: typing.Optional[collections.abc.Mapping[str, typing.Any]] = None,
) -> typing.Any:
extra = extra or {}
return type(
**{
k: typing.cast(
typing.Callable[..., typing.Any], CONVERSORS.get(type.__annotations__.get(k, str), lambda x: x)
)(dictionary.get(k, extra.get(k, None)))
for k in type._fields # pyright: ignore # _fields is a NamedTuple attribute that contains fields
}
)
# Need to be "NamedTuple"s because we use _fields attribute
class Cluster(typing.NamedTuple):
@dataclasses.dataclass
class Cluster:
name: str
version: str
id: str
@ -48,10 +18,17 @@ class Cluster(typing.NamedTuple):
@staticmethod
def from_dict(dictionary: collections.abc.MutableMapping[str, typing.Any]) -> 'Cluster':
return _from_dict(Cluster, dictionary)
return Cluster(
name=dictionary.get('name', ''),
version=dictionary.get('version', ''),
id=dictionary.get('id', ''),
nodes=dictionary.get('nodes', 0),
quorate=dictionary.get('quorate', 0),
)
class Node(typing.NamedTuple):
@dataclasses.dataclass
class Node:
name: str
online: bool
local: bool
@ -62,10 +39,31 @@ class Node(typing.NamedTuple):
@staticmethod
def from_dict(dictionary: collections.abc.MutableMapping[str, typing.Any]) -> 'Node':
return _from_dict(Node, dictionary)
return Node(
name=dictionary.get('name', ''),
online=dictionary.get('online', False),
local=dictionary.get('local', False),
nodeid=dictionary.get('nodeid', 0),
ip=dictionary.get('ip', ''),
level=dictionary.get('level', ''),
id=dictionary.get('id', ''),
)
@staticmethod
def null() -> 'Node':
return Node(
name='',
online=False,
local=False,
nodeid=0,
ip='',
level='',
id='',
)
class NodeStats(typing.NamedTuple):
@dataclasses.dataclass
class NodeStats:
name: str
status: str
uptime: int
@ -80,8 +78,19 @@ class NodeStats(typing.NamedTuple):
@staticmethod
def from_dict(dictionary: collections.abc.MutableMapping[str, typing.Any]) -> 'NodeStats':
dictionary['name'] = dictionary['node']
return _from_dict(NodeStats, dictionary)
return NodeStats(
name=dictionary.get('node', ''),
status=dictionary.get('status', ''),
uptime=dictionary.get('uptime', 0),
disk=dictionary.get('disk', 0),
maxdisk=dictionary.get('maxdisk', 0),
level=dictionary.get('level', ''),
id=dictionary.get('id', ''),
mem=dictionary.get('mem', 0),
maxmem=dictionary.get('maxmem', 0),
cpu=dictionary.get('cpu', 0),
maxcpu=dictionary.get('maxcpu', 0),
)
@staticmethod
def null() -> 'NodeStats':
@ -100,7 +109,8 @@ class NodeStats(typing.NamedTuple):
)
class ClusterInfo(typing.NamedTuple):
@dataclasses.dataclass
class ClusterInfo:
cluster: typing.Optional[Cluster]
nodes: list[Node]
@ -118,7 +128,8 @@ class ClusterInfo(typing.NamedTuple):
return ClusterInfo(cluster=cluster, nodes=nodes)
class UPID(typing.NamedTuple):
@dataclasses.dataclass
class UPID:
node: str
pid: int
pstart: int
@ -144,7 +155,8 @@ class UPID(typing.NamedTuple):
)
class TaskStatus(typing.NamedTuple):
@dataclasses.dataclass
class TaskStatus:
node: str
pid: int
pstart: int
@ -158,7 +170,19 @@ class TaskStatus(typing.NamedTuple):
@staticmethod
def from_dict(dictionary: collections.abc.MutableMapping[str, typing.Any]) -> 'TaskStatus':
return _from_dict(TaskStatus, dictionary['data'])
data = dictionary['data']
return TaskStatus(
node=data['node'],
pid=data['pid'],
pstart=data['pstart'],
starttime=datetime.datetime.fromtimestamp(data['starttime']),
type=data['type'],
status=data['status'],
exitstatus=data['exitstatus'],
user=data['user'],
upid=data['upid'],
id=dictionary['id'],
)
def is_running(self) -> bool:
return self.status == 'running'
@ -173,7 +197,8 @@ class TaskStatus(typing.NamedTuple):
return self.is_finished() and not self.is_completed()
class NetworkConfiguration(typing.NamedTuple):
@dataclasses.dataclass
class NetworkConfiguration:
net: str
type: str
mac: str
@ -188,7 +213,8 @@ class NetworkConfiguration(typing.NamedTuple):
return NetworkConfiguration(net=net, type=type, mac=mac)
class VMInfo(typing.NamedTuple):
@dataclasses.dataclass
class VMInfo:
status: str
vmid: int
node: str
@ -226,12 +252,33 @@ class VMInfo(typing.NamedTuple):
if vgpu_type is not None:
break # Already found it, stop looking
data = _from_dict(VMInfo, dictionary, {'vgpu_type': vgpu_type})
return data
return VMInfo(
status=dictionary.get('status', ''),
vmid=dictionary.get('vmid', 0),
node=dictionary.get('node', ''),
template=dictionary.get('template', False),
agent=dictionary.get('agent', None),
cpus=dictionary.get('cpus', None),
lock=dictionary.get('lock', None),
disk=dictionary.get('disk', None),
maxdisk=dictionary.get('maxdisk', None),
mem=dictionary.get('mem', None),
maxmem=dictionary.get('maxmem', None),
name=dictionary.get('name', None),
pid=dictionary.get('pid', None),
qmpstatus=dictionary.get('qmpstatus', None),
tags=dictionary.get('tags', None),
uptime=dictionary.get('uptime', None),
netin=dictionary.get('netin', None),
netout=dictionary.get('netout', None),
diskread=dictionary.get('diskread', None),
diskwrite=dictionary.get('diskwrite', None),
vgpu_type=vgpu_type,
)
class VMConfiguration(typing.NamedTuple):
@dataclasses.dataclass
class VMConfiguration:
name: str
vga: str
sockets: int
@ -244,23 +291,34 @@ class VMConfiguration(typing.NamedTuple):
template: bool
@staticmethod
def from_dict(src: collections.abc.MutableMapping[str, typing.Any]) -> 'VMConfiguration':
def from_dict(dictionary: collections.abc.MutableMapping[str, typing.Any]) -> 'VMConfiguration':
nets: list[NetworkConfiguration] = []
for k in src.keys():
for k in dictionary.keys():
if k[:3] == 'net':
nets.append(NetworkConfiguration.from_str(k, src[k]))
nets.append(NetworkConfiguration.from_str(k, dictionary[k]))
src['networks'] = nets
return _from_dict(VMConfiguration, src)
return VMConfiguration(
name=dictionary.get('name', ''),
vga=dictionary.get('vga', ''),
sockets=dictionary.get('sockets', 0),
cores=dictionary.get('cores', 0),
vmgenid=dictionary.get('vmgenid', ''),
digest=dictionary.get('digest', ''),
networks=nets,
tpmstate0=dictionary.get('tpmstate0', ''),
template=dictionary.get('template', False),
)
class VmCreationResult(typing.NamedTuple):
@dataclasses.dataclass
class VmCreationResult:
node: str
vmid: int
upid: UPID
class StorageInfo(typing.NamedTuple):
@dataclasses.dataclass
class StorageInfo:
node: str
storage: str
content: tuple[str, ...]
@ -275,10 +333,22 @@ class StorageInfo(typing.NamedTuple):
@staticmethod
def from_dict(dictionary: collections.abc.MutableMapping[str, typing.Any]) -> 'StorageInfo':
return _from_dict(StorageInfo, dictionary)
return StorageInfo(
node=dictionary.get('node', ''),
storage=dictionary.get('storage', ''),
content=tuple(dictionary.get('content', [])),
type=dictionary.get('type', ''),
shared=dictionary.get('shared', False),
active=dictionary.get('active', False),
used=dictionary.get('used', 0),
avail=dictionary.get('avail', 0),
total=dictionary.get('total', 0),
used_fraction=dictionary.get('used_fraction', 0),
)
class PoolMemberInfo(typing.NamedTuple):
@dataclasses.dataclass
class PoolMemberInfo:
id: str
node: str
storage: str
@ -288,10 +358,18 @@ class PoolMemberInfo(typing.NamedTuple):
@staticmethod
def from_dict(dictionary: collections.abc.MutableMapping[str, typing.Any]) -> 'PoolMemberInfo':
return _from_dict(PoolMemberInfo, dictionary)
return PoolMemberInfo(
id=dictionary.get('id', ''),
node=dictionary.get('node', ''),
storage=dictionary.get('storage', ''),
type=dictionary.get('type', ''),
vmid=dictionary.get('vmid', 0),
vmname=dictionary.get('vmname', ''),
)
class PoolInfo(typing.NamedTuple):
@dataclasses.dataclass
class PoolInfo:
poolid: str
comments: str
members: list[PoolMemberInfo]
@ -303,13 +381,15 @@ class PoolInfo(typing.NamedTuple):
else:
members = []
dictionary['comments'] = dictionary.get('comments', '')
dictionary['members'] = members
return _from_dict(PoolInfo, dictionary=dictionary)
return PoolInfo(
poolid=dictionary.get('poolid', ''),
comments=dictionary.get('comments', ''),
members=members,
)
class SnapshotInfo(typing.NamedTuple):
@dataclasses.dataclass
class SnapshotInfo:
name: str
description: str
@ -319,10 +399,17 @@ class SnapshotInfo(typing.NamedTuple):
@staticmethod
def from_dict(dictionary: collections.abc.MutableMapping[str, typing.Any]) -> 'SnapshotInfo':
return _from_dict(SnapshotInfo, dictionary)
return SnapshotInfo(
name=dictionary.get('name', ''),
description=dictionary.get('description', ''),
parent=dictionary.get('parent', None),
snaptime=dictionary.get('snaptime', None),
vmstate=dictionary.get('vmstate', None),
)
class VGPUInfo(typing.NamedTuple):
@dataclasses.dataclass
class VGPUInfo:
name: str
description: str
device: str
@ -331,4 +418,10 @@ class VGPUInfo(typing.NamedTuple):
@staticmethod
def from_dict(dictionary: collections.abc.MutableMapping[str, typing.Any]) -> 'VGPUInfo':
return _from_dict(VGPUInfo, dictionary)
return VGPUInfo(
name=dictionary.get('name', ''),
description=dictionary.get('description', ''),
device=dictionary.get('device', ''),
available=dictionary.get('available', False),
type=dictionary.get('type', ''),
)

View File

@ -98,7 +98,7 @@ class ProxmoxPublication(DynamicPublication, autoserializable.AutoSerializable):
def op_create(self) -> None:
# First we should create a full clone, so base machine do not get fullfilled with "garbage" delta disks...
# Name is generated on op_initialize by DynamicPublication
task = self.service().clone_machine(self._name, self.generate_annotation())
task = self.service().clone_vm(self._name, self.generate_annotation())
self._vmid = str(task.vmid)
self._task = ','.join((task.upid.node, task.upid.upid))

View File

@ -185,7 +185,7 @@ class ProxmoxServiceLinked(DynamicService):
self.machine.set_choices(
[
gui.choice_item(str(m.vmid), f'{m.node}\\{m.name or m.vmid} ({m.vmid})')
for m in self.provider().list_machines()
for m in self.provider().list_vms()
if m.name and m.name[:3] != 'UDS'
]
)
@ -207,7 +207,7 @@ class ProxmoxServiceLinked(DynamicService):
"""
return re.sub("[^a-zA-Z0-9_-]", "-", name)
def clone_machine(self, name: str, description: str, vmid: int = -1) -> 'prox_types.VmCreationResult':
def clone_vm(self, name: str, description: str, vmid: int = -1) -> 'prox_types.VmCreationResult':
name = self.sanitized_name(name)
pool = self.pool.value or None
if vmid == -1: # vmId == -1 if cloning for template
@ -247,7 +247,7 @@ class ProxmoxServiceLinked(DynamicService):
self.do_log(level=types.log.LogLevel.WARNING, message=f'Exception disabling HA for vm {vmid}: {e}')
# And remove it
return self.provider().remove_machine(vmid)
return self.provider().delete_vm(vmid)
def enable_vm_ha(self, vmid: int, started: bool = False) -> None:
if self.ha.value == '__':
@ -319,4 +319,4 @@ class ProxmoxServiceLinked(DynamicService):
def execute_delete(self, vmid: str) -> None:
# All removals are deferred, so we can do it async
# Try to stop it if already running... Hard stop
self.provider().remove_machine(int(vmid))
self.provider().delete_vm(int(vmid))