1
0
mirror of https://github.com/dkmstr/openuds.git synced 2025-01-08 21:18:00 +03:00

chore: Update ProxmoxClient method name from remove_snapshot to delete_snapshot

Added some more tests and fixes
This commit is contained in:
Adolfo Gómez García 2024-08-20 04:09:54 +02:00
parent 4bc3947dc1
commit fd66a226da
No known key found for this signature in database
GPG Key ID: DD1ABF20724CDA23
5 changed files with 218 additions and 41 deletions

View File

@ -399,7 +399,7 @@ CLIENT_METHODS_INFO: list[AutoSpecMethodInfo] = [
# create_snapshot
AutoSpecMethodInfo(uds.services.Proxmox.proxmox.client.ProxmoxClient.create_snapshot, returns=UPID),
# remove_snapshot
AutoSpecMethodInfo(uds.services.Proxmox.proxmox.client.ProxmoxClient.remove_snapshot, returns=UPID),
AutoSpecMethodInfo(uds.services.Proxmox.proxmox.client.ProxmoxClient.delete_snapshot, returns=UPID),
# restore_snapshot
AutoSpecMethodInfo(uds.services.Proxmox.proxmox.client.ProxmoxClient.restore_snapshot, returns=UPID),
# get_task

View File

@ -34,10 +34,12 @@ import random
import time
import typing
import logging
import contextlib
from uds.services.Proxmox.proxmox import (
types as prox_types,
client as prox_client,
exceptions as prox_exceptions,
)
from tests.utils import vars
@ -55,6 +57,7 @@ class TestProxmoxClient(UDSTransactionTestCase):
test_vm: prox_types.VMInfo = prox_types.VMInfo.null()
pool: prox_types.PoolInfo = prox_types.PoolInfo.null()
storage: prox_types.StorageInfo = prox_types.StorageInfo.null()
hagroup: str = ''
def setUp(self) -> None:
v = vars.get_vars('proxmox')
@ -80,11 +83,22 @@ class TestProxmoxClient(UDSTransactionTestCase):
for pool in self.pclient.list_pools():
if pool.id == v['test_pool']: # id is the pool name in proxmox
self.pool = pool
if self.pool.is_null():
self.skipTest('No valid pool found')
for storage in self.pclient.list_storages():
if storage.storage == v['test_storage']:
self.storage = storage
if self.storage.is_null():
self.skipTest('No valid storage found')
self.hagroup = v['test_ha_group']
# Ensure we have a valid pool, storage and ha group
if self.hagroup not in self.pclient.list_ha_groups():
self.skipTest('No valid ha group found')
def _get_new_vmid(self) -> int:
MAX_RETRIES: typing.Final[int] = 512 # So we don't loop forever, just in case...
vmid = 0
@ -107,6 +121,37 @@ class TestProxmoxClient(UDSTransactionTestCase):
return
raise Exception('Timeout waiting for task to finish')
@contextlib.contextmanager
def _create_test_vm(
self,
vmid: typing.Optional[int] = None,
as_linked_clone: bool = False,
target_node: typing.Optional[str] = None,
target_storage: typing.Optional[str] = None,
target_pool: typing.Optional[str] = None,
must_have_vgpus: typing.Optional[bool] = None,
) -> typing.Iterator[prox_types.VMInfo]:
new_vmid = self._get_new_vmid()
res: typing.Optional[prox_types.VmCreationResult] = None
try:
res = self.pclient.clone_vm(
vmid=vmid or self.test_vm.id,
new_vmid=new_vmid,
name=f'uds-test-{new_vmid}',
description=f'UDS Test VM {new_vmid} (cloned from {self.test_vm.id})',
as_linked_clone=as_linked_clone, # Test VM is not a template, so cannot be linked cloned
target_node=target_node,
target_storage=target_storage or self.storage.storage,
target_pool=target_pool,
must_have_vgpus=must_have_vgpus,
)
# Wait for the task to finish
self._wait_for_task(res.node, res.upid.upid)
yield self.pclient.get_vm_info(res.vmid)
finally:
if res:
self.pclient.delete_vm(res.vmid)
# Connect is not needed, because setUp will do the connection so if it fails, the test will throw an exception
def test_get_cluster_info(self) -> None:
@ -166,36 +211,105 @@ class TestProxmoxClient(UDSTransactionTestCase):
self.assertIn(node.name, [n['node'] for n in self.pclient.get_cluster_resources('node')])
def test_clone_vm_ok(self) -> None:
res: typing.Optional[prox_types.VmCreationResult] = None
try:
new_vmid = self._get_new_vmid()
res = self.pclient.clone_vm(
vmid=self.test_vm.id,
new_vmid=new_vmid,
name=f'uds-test-{new_vmid}',
description='Test VM',
as_linked_clone=False, # Test VM is not a template, so cannot be linked cloned
target_node=None,
target_storage=self.storage.storage,
target_pool=None,
must_have_vgpus=None,
)
self.assertIsInstance(res, prox_types.VmCreationResult)
except Exception as e:
# Remove the vm if it was created
self.fail(f'Exception cloning vm: {e}')
finally:
if res and res.vmid:
# Wait for the task to finish
self._wait_for_task(res.node, res.upid.upid)
self.pclient.delete_vm(res.vmid)
# In fact, use the context manager to test this
# because it's the same code
with self._create_test_vm():
pass # Just test that it does not raise
def test_clone_vm_fail_invalid_vmid(self) -> None:
with self.assertRaises(prox_exceptions.ProxmoxNotFound):
with self._create_test_vm(vmid=-1):
pass
def test_clone_vm_fail_invalid_node(self) -> None:
with self.assertRaises(prox_exceptions.ProxmoxDoesNotExists):
with self._create_test_vm(target_node='invalid-node'):
pass
def test_clone_vm_fail_invalid_pool(self) -> None:
with self.assertRaises(prox_exceptions.ProxmoxDoesNotExists):
with self._create_test_vm(target_pool='invalid-pool'):
pass
def test_clone_vm_fail_invalid_storage(self) -> None:
with self.assertRaises(prox_exceptions.ProxmoxDoesNotExists):
with self._create_test_vm(target_storage='invalid-storage'):
pass
def test_clone_vm_fail_no_vgpus(self) -> None:
with self.assertRaises(prox_exceptions.ProxmoxError):
with self._create_test_vm(must_have_vgpus=True):
pass
def test_list_ha_groups(self) -> None:
groups = self.pclient.list_ha_groups()
self.assertIsInstance(groups, list)
for group in groups:
self.assertIsInstance(group, str)
self.assertIn(self.hagroup, groups)
def test_enable_disable_vm_ha(self) -> None:
with self._create_test_vm() as vm:
self.pclient.enable_vm_ha(vm.id, started=False, group=self.hagroup)
# Ensure it's enabled
vminfo = self.pclient.get_vm_info(vm.id)
self.assertEqual(vminfo.ha.group, self.hagroup)
# Disable it
self.pclient.disable_vm_ha(vm.id)
vminfo = self.pclient.get_vm_info(vm.id)
self.assertEqual(vminfo.ha.group, '')
def test_set_vm_protection(self) -> None:
with self._create_test_vm() as vm:
self.pclient.set_vm_protection(vm.id, protection=True)
vmconfig = self.pclient.get_vm_config(vm.id)
self.assertTrue(vmconfig.protection)
self.pclient.set_vm_protection(vm.id, protection=False)
vmconfig = self.pclient.get_vm_config(vm.id)
self.assertFalse(vmconfig.protection)
def test_get_guest_ip_address(self) -> None:
# Should raise an exception, because the test vm is not running
with self.assertRaises(prox_exceptions.ProxmoxError):
self.pclient.get_guest_ip_address(self.test_vm.id)
# delete_vm should work, because the vm is created and deleted in the context manager
def test_snapshots(self) -> None:
with self._create_test_vm() as vm:
# Create snapshot for the vm
task = self.pclient.create_snapshot(vm.id, name='test-snapshot')
self._wait_for_task(task.node, task.upid)
snapshots = self.pclient.list_snapshots(vm.id)
self.assertIsInstance(snapshots, list)
# should have TWO snapshots, the one created by us and "current"
self.assertTrue(len(snapshots) == 2)
for snapshot in snapshots:
self.assertIsInstance(snapshot, prox_types.SnapshotInfo)
# test-snapshot should be there
self.assertIn('test-snapshot', [s.name for s in snapshots])
# Restore the snapshot
task = self.pclient.restore_snapshot(vm.id, name='test-snapshot')
self._wait_for_task(task.node, task.upid)
# Delete the snapshot
task = self.pclient.delete_snapshot(vm.id, name='test-snapshot')
self._wait_for_task(task.node, task.upid)
snapshots = self.pclient.list_snapshots(vm.id)
self.assertTrue(len(snapshots) == 1)
# get_task_info should work, because we wait for the task to finish in _wait_for_task
def test_list_vms(self) -> None:
vms = self.pclient.list_vms()
# At least, the test vm should be there :)
self.assertTrue(len(vms) > 0)
# Assert the test vm is there
self.assertIn(self.test_vm, vms)
self.assertIn(self.test_vm.id, [i.id for i in vms])
self.assertTrue(self.test_vm.id > 0)
self.assertTrue(self.test_vm.status in prox_types.VMStatus)
@ -219,3 +333,16 @@ class TestProxmoxClient(UDSTransactionTestCase):
self.assertIsInstance(self.test_vm.diskread, (int, type(None)))
self.assertIsInstance(self.test_vm.diskwrite, (int, type(None)))
self.assertIsInstance(self.test_vm.vgpu_type, (str, type(None)))
def test_get_vm_pool_info(self) -> None:
with self._create_test_vm(target_pool=self.pool.id) as vm:
vminfo = self.pclient.get_vm_pool_info(vmid=vm.id, poolid=self.pool.id)
self.assertIsInstance(vminfo, prox_types.VMInfo)
self.assertEqual(vminfo.id, vm.id)
# get_vm_info should work, because we get the info of the test vm in setUp
def test_get_vm_config(self) -> None:
vmconfig = self.pclient.get_vm_config(self.test_vm.id)
self.assertIsInstance(vmconfig, prox_types.VMConfiguration)
self.assertEqual(vmconfig.name, self.test_vm.name)

View File

@ -104,17 +104,17 @@ class ProxmoxClient:
return self._session
self._session = security.secure_requests_session(verify=self._verify_ssl)
if self._use_api_token:
token = f'{self._credentials[0][1]}={self._credentials[1][1]}'
# Set _ticket to something, so we don't try to connect again
self._ticket = 'API_TOKEN' # Using API token, not a real ticket
self._session.headers.update(
{
'Accept': 'application/json',
'Content-Type': 'application/x-www-form-urlencoded',
# 'Content-Type': 'application/json',
'Authorization': f'PVEAPIToken={token}',
'Accept': 'application/json',
'Content-Type': 'application/x-www-form-urlencoded',
# 'Content-Type': 'application/json',
'Authorization': f'PVEAPIToken={token}',
}
)
else:
@ -158,9 +158,8 @@ class ProxmoxClient:
_update_session(ticket, csrf)
except requests.RequestException as e:
raise exceptions.ProxmoxConnectionError(str(e)) from e
return self._session
return self._session
def ensure_correct(self, response: 'requests.Response', *, node: typing.Optional[str]) -> typing.Any:
if not response.ok:
@ -365,8 +364,21 @@ class ProxmoxClient:
target_pool: typing.Optional[str] = None,
must_have_vgpus: typing.Optional[bool] = None,
) -> types.VmCreationResult:
# Get info of the vm, also ensures that the vm exists
vminfo = self.get_vm_info(vmid)
# Ensure exists target storage
if target_storage and not any(
s.storage == target_storage for s in self.list_storages(node=target_node)
):
raise exceptions.ProxmoxDoesNotExists(
f'Storage "{target_storage}" does not exist on node "{target_node}"'
)
# Ensure exists target pool, (id is in fact the name of the pool)
if target_pool and not any(p.id == target_pool for p in self.list_pools()):
raise exceptions.ProxmoxDoesNotExists(f'Pool "{target_pool}" does not exist')
src_node = vminfo.node
if not target_node:
@ -384,9 +396,15 @@ class ProxmoxClient:
else:
target_node = src_node
# Ensure exists target node
if not any(n.name == target_node for n in self.get_cluster_info().nodes):
raise exceptions.ProxmoxDoesNotExists(f'Node "{target_node}" does not exist')
# Check if mustHaveVGPUS is compatible with the node
if must_have_vgpus is not None and must_have_vgpus != bool(self.list_node_gpu_devices(target_node)):
raise exceptions.ProxmoxNoGPUError(f'Node "{target_node}" does not have VGPUS and they are required')
raise exceptions.ProxmoxNoGPUError(
f'Node "{target_node}" does not have VGPUS and they are required'
)
if self.node_has_vgpus_available(target_node, vminfo.vgpu_type):
raise exceptions.ProxmoxNoGPUError(
@ -542,10 +560,9 @@ class ProxmoxClient:
('snapname', name),
('description', description or f'UDS Snapshot created at {time.strftime("%c")}'),
]
params.append(('snapname', name or ''))
return types.UPID.from_dict(self.do_post(f'nodes/{node}/qemu/{vmid}/snapshot', data=params, node=node))
def remove_snapshot(
def delete_snapshot(
self,
vmid: int,
*,

View File

@ -43,8 +43,10 @@ class ProxmoxConnectionError(ProxmoxError, exceptions.RetryableError):
class ProxmoxAuthError(ProxmoxError, exceptions.FatalError):
pass
class ProxmoxDoesNotExists(ProxmoxError):
pass
class ProxmoxNotFound(ProxmoxError, exceptions.NotFoundError):
class ProxmoxNotFound(ProxmoxDoesNotExists, exceptions.NotFoundError):
pass
@ -52,5 +54,5 @@ class ProxmoxNodeUnavailableError(ProxmoxConnectionError):
pass
class ProxmoxNoGPUError(ProxmoxError):
class ProxmoxNoGPUError(ProxmoxDoesNotExists):
pass

View File

@ -233,12 +233,36 @@ class NetworkConfiguration:
return NetworkConfiguration(net=net, type=type, mac=mac)
@dataclasses.dataclass
class HAInfo:
state: str
group: str
managed: bool
@staticmethod
def from_dict(dictionary: collections.abc.MutableMapping[str, typing.Any]) -> 'HAInfo':
return HAInfo(
state=dictionary.get('state', ''),
group=dictionary.get('group', ''),
managed=dictionary.get('managed', False),
)
@staticmethod
def null() -> 'HAInfo':
return HAInfo(
state='',
group='',
managed=False,
)
@dataclasses.dataclass
class VMInfo:
id: int
status: VMStatus
node: str
template: bool
ha: HAInfo
agent: typing.Optional[str]
cpus: typing.Optional[int]
@ -263,6 +287,9 @@ class VMInfo:
raise prox_exceptions.ProxmoxNotFound('VM not found')
return self
def is_null(self) -> bool:
return self.id == -1
@staticmethod
def from_dict(dictionary: collections.abc.MutableMapping[str, typing.Any]) -> 'VMInfo':
vgpu_type = None
@ -282,6 +309,7 @@ class VMInfo:
id=int(dictionary.get('vmid', 0)),
node=dictionary.get('node', ''),
template=dictionary.get('template', False),
ha=HAInfo.from_dict(dictionary.get('ha', {})),
agent=dictionary.get('agent', None),
cpus=dictionary.get('cpus', None),
lock=dictionary.get('lock', None),
@ -308,6 +336,7 @@ class VMInfo:
id=-1,
node='',
template=False,
ha=HAInfo.null(),
agent=None,
cpus=None,
lock=None,
@ -327,9 +356,6 @@ class VMInfo:
vgpu_type=None,
)
def is_null(self) -> bool:
return self.id == -1
@dataclasses.dataclass
class VMConfiguration:
@ -343,6 +369,7 @@ class VMConfiguration:
tpmstate0: typing.Optional[str]
template: bool
protection: bool
@staticmethod
def from_dict(dictionary: collections.abc.MutableMapping[str, typing.Any]) -> 'VMConfiguration':
@ -361,6 +388,7 @@ class VMConfiguration:
networks=nets,
tpmstate0=dictionary.get('tpmstate0', ''),
template=dictionary.get('template', False),
protection=dictionary.get('protection', False),
)
@ -384,6 +412,9 @@ class StorageInfo:
avail: int
total: int
def is_null(self) -> bool:
return self.node == '' and self.storage == ''
@staticmethod
def from_dict(dictionary: collections.abc.MutableMapping[str, typing.Any]) -> 'StorageInfo':
if 'maxdisk' in dictionary: # From cluster/resources