1
0
mirror of https://github.com/dkmstr/openuds.git synced 2025-03-20 06:50:23 +03:00

Adding more proxmox client tests

This commit is contained in:
Adolfo Gómez García 2024-08-19 22:56:05 +02:00
parent 23f52b363c
commit 6f8eb00ad0
No known key found for this signature in database
GPG Key ID: DD1ABF20724CDA23

View File

@ -31,6 +31,7 @@
Author: Adolfo Gómez, dkmaster at dkmon dot com
"""
import random
import time
import typing
import logging
@ -51,8 +52,9 @@ class TestProxmoxClient(UDSTransactionTestCase):
pclient: prox_client.ProxmoxClient
vm: prox_types.VMInfo = prox_types.VMInfo.null()
test_vm: prox_types.VMInfo = prox_types.VMInfo.null()
pool: prox_types.PoolInfo = prox_types.PoolInfo.null()
storage: prox_types.StorageInfo = prox_types.StorageInfo.null()
def setUp(self) -> None:
v = vars.get_vars('proxmox')
@ -70,53 +72,150 @@ class TestProxmoxClient(UDSTransactionTestCase):
for vm in self.pclient.list_vms():
if vm.name == v['test_vm']:
self.vm = vm
self.test_vm = self.pclient.get_vm_info(vm.id) # To ensure we have all the info
if self.vm.is_null():
if self.test_vm.is_null():
self.skipTest('No test vm found')
for pool in self.pclient.list_pools():
if pool.id == v['test_pool']: # id is the pool name in proxmox
self.pool = pool
for storage in self.pclient.list_storages():
if storage.storage == v['test_storage']:
self.storage = storage
def _get_new_vmid(self) -> int:
MAX_RETRIES: typing.Final[int] = 512 # So we don't loop forever, just in case...
vmid = 0
for _ in range(MAX_RETRIES):
vmid = 100000 + random.randint(0, 899999) # Get a reasonable vmid
vmid = 1000000 + random.randint(0, 899999) # Get a reasonable vmid
if self.pclient.is_vmid_available(vmid):
return vmid
# All assigned vmid will be left as unusable on UDS until released by time (3 years)
# This is not a problem at all, in the rare case that a machine id is released from uds db
# if it exists when we try to create a new one, we will simply try to get another one
self.fail(f'Could not get a new vmid!!: last tried {vmid}')
def _wait_for_task(self, node: str, upid: str, timeout: int = 16) -> None:
while timeout > 0:
timeout -= 1
task_info = self.pclient.get_task_info(node, upid)
if task_info.is_running():
time.sleep(1)
else:
return
raise Exception('Timeout waiting for task to finish')
# Connect is not needed, because setUp will do the connection so if it fails, the test will throw an exception
def test_list_vms(self):
def test_get_cluster_info(self) -> None:
cluster_info = self.pclient.get_cluster_info()
self.assertIsInstance(cluster_info, prox_types.ClusterInfo)
# May be no part of a cluster, so cluster_info.cluster can be None
self.assertIsNotNone(cluster_info.nodes)
def test_get_cluster_resources(self) -> None:
res1 = self.pclient.get_cluster_resources('vm')
res2 = self.pclient.get_cluster_resources('storage')
res3 = self.pclient.get_cluster_resources('node')
res4 = self.pclient.get_cluster_resources('sdn')
self.assertIsInstance(res1, list)
# ensure can convert to vm info
for r in res1:
prox_types.VMInfo.from_dict(r) # Should not raise
self.assertIsInstance(res2, list)
# ensure can convert to storage info
for r in res2:
prox_types.StorageInfo.from_dict(r)
self.assertIsInstance(res3, list)
# Ensure can convert to node stats
for r in res3:
prox_types.NodeStats.from_dict(r)
self.assertIsInstance(res4, list)
def test_get_node_networks(self) -> None:
networks = self.pclient.get_node_networks(self.test_vm.node)
self.assertIsInstance(networks, list)
def test_list_node_gpu_devices(self) -> None:
gpus = self.pclient.list_node_gpu_devices(self.test_vm.node)
self.assertIsInstance(gpus, list)
def test_list_node_vgpus(self) -> None:
vgpues = self.pclient.list_node_vgpus(self.test_vm.node)
self.assertIsInstance(vgpues, list)
for vgpu in vgpues:
self.assertIsInstance(vgpu, prox_types.VGPUInfo)
def test_node_has_vgpus_available(self) -> None:
# if no vgpu available, it should return False
# But here, we only test that the method does not raise an exception
self.pclient.node_has_vgpus_available(self.test_vm.node, None)
def test_get_best_node_for_vm(self) -> None:
node = self.pclient.get_best_node_for_vm()
# Node should be a NodeStats, and must be part of the nodes got from get_cluster_resources
if node is None:
self.fail('No node found')
self.assertIsInstance(node, prox_types.NodeStats)
self.assertIn(node.name, [n['node'] for n in self.pclient.get_cluster_resources('node')])
def test_clone_vm_ok(self) -> None:
res: typing.Optional[prox_types.VmCreationResult] = None
try:
new_vmid = self._get_new_vmid()
res = self.pclient.clone_vm(
vmid=self.test_vm.id,
new_vmid=new_vmid,
name=f'uds-test-{new_vmid}',
description='Test VM',
as_linked_clone=False, # Test VM is not a template, so cannot be linked cloned
target_node=None,
target_storage=self.storage.storage,
target_pool=None,
must_have_vgpus=None,
)
self.assertIsInstance(res, prox_types.VmCreationResult)
except Exception as e:
# Remove the vm if it was created
self.fail(f'Exception cloning vm: {e}')
finally:
if res and res.vmid:
# Wait for the task to finish
self._wait_for_task(res.node, res.upid.upid)
self.pclient.delete_vm(res.vmid)
def test_list_vms(self) -> None:
vms = self.pclient.list_vms()
# At least, the test vm should be there :)
self.assertTrue(len(vms) > 0)
# Assert the test vm is there
self.assertIn(self.test_vm, vms)
self.assertTrue(self.vm.id > 0)
self.assertTrue(self.vm.status in prox_types.VMStatus)
self.assertTrue(self.vm.node)
self.assertTrue(self.vm.template in (True, False))
self.assertTrue(self.test_vm.id > 0)
self.assertTrue(self.test_vm.status in prox_types.VMStatus)
self.assertTrue(self.test_vm.node)
self.assertTrue(self.test_vm.template in (True, False))
self.assertIsInstance(self.vm.agent, (str, type(None)))
self.assertIsInstance(self.vm.cpus, (int, type(None)))
self.assertIsInstance(self.vm.lock, (str, type(None)))
self.assertIsInstance(self.vm.disk, (int, type(None)))
self.assertIsInstance(self.vm.maxdisk, (int, type(None)))
self.assertIsInstance(self.vm.mem, (int, type(None)))
self.assertIsInstance(self.vm.maxmem, (int, type(None)))
self.assertIsInstance(self.vm.name, (str, type(None)))
self.assertIsInstance(self.vm.pid, (int, type(None)))
self.assertIsInstance(self.vm.qmpstatus, (str, type(None)))
self.assertIsInstance(self.vm.tags, (str, type(None)))
self.assertIsInstance(self.vm.uptime, (int, type(None)))
self.assertIsInstance(self.vm.netin, (int, type(None)))
self.assertIsInstance(self.vm.netout, (int, type(None)))
self.assertIsInstance(self.vm.diskread, (int, type(None)))
self.assertIsInstance(self.vm.diskwrite, (int, type(None)))
self.assertIsInstance(self.vm.vgpu_type, (str, type(None)))
self.assertIsInstance(self.test_vm.agent, (str, type(None)))
self.assertIsInstance(self.test_vm.cpus, (int, type(None)))
self.assertIsInstance(self.test_vm.lock, (str, type(None)))
self.assertIsInstance(self.test_vm.disk, (int, type(None)))
self.assertIsInstance(self.test_vm.maxdisk, (int, type(None)))
self.assertIsInstance(self.test_vm.mem, (int, type(None)))
self.assertIsInstance(self.test_vm.maxmem, (int, type(None)))
self.assertIsInstance(self.test_vm.name, (str, type(None)))
self.assertIsInstance(self.test_vm.pid, (int, type(None)))
self.assertIsInstance(self.test_vm.qmpstatus, (str, type(None)))
self.assertIsInstance(self.test_vm.tags, (str, type(None)))
self.assertIsInstance(self.test_vm.uptime, (int, type(None)))
self.assertIsInstance(self.test_vm.netin, (int, type(None)))
self.assertIsInstance(self.test_vm.netout, (int, type(None)))
self.assertIsInstance(self.test_vm.diskread, (int, type(None)))
self.assertIsInstance(self.test_vm.diskwrite, (int, type(None)))
self.assertIsInstance(self.test_vm.vgpu_type, (str, type(None)))