1
0
mirror of https://github.com/dkmstr/openuds.git synced 2025-03-20 06:50:23 +03:00

Finished refactoring of Proxmox client

This commit is contained in:
Adolfo Gómez García 2024-07-03 00:17:50 +02:00
parent 3f7cb8e3db
commit 06bc4cae4a
No known key found for this signature in database
GPG Key ID: DD1ABF20724CDA23
12 changed files with 992 additions and 897 deletions

View File

@ -40,26 +40,28 @@ import uuid
from uds.core import types, environment
from uds.core.ui.user_interface import gui
import uds.services.Proxmox.proxmox.client
from ...utils.autospec import autospec, AutoSpecMethodInfo
from uds.services.Proxmox import (
deployment_linked,
provider,
proxmox,
service_fixed,
publication,
deployment_fixed,
service_linked,
)
NODES: typing.Final[list[proxmox.types.Node]] = [
proxmox.types.Node(name='node0', online=True, local=True, nodeid=1, ip='0.0.0.1', level='level', id='id'),
proxmox.types.Node(name='node1', online=True, local=True, nodeid=2, ip='0.0.0.2', level='level', id='id'),
from uds.services.Proxmox.proxmox import types as prox_types
NODES: typing.Final[list[prox_types.Node]] = [
prox_types.Node(name='node0', online=True, local=True, nodeid=1, ip='0.0.0.1', level='level', id='id'),
prox_types.Node(name='node1', online=True, local=True, nodeid=2, ip='0.0.0.2', level='level', id='id'),
]
NODE_STATS: typing.Final[list[proxmox.types.NodeStats]] = [
proxmox.types.NodeStats(
NODE_STATS: typing.Final[list[prox_types.NodeStats]] = [
prox_types.NodeStats(
name='name',
status='status',
uptime=1,
@ -72,7 +74,7 @@ NODE_STATS: typing.Final[list[proxmox.types.NodeStats]] = [
cpu=1.0,
maxcpu=1,
),
proxmox.types.NodeStats(
prox_types.NodeStats(
name='name',
status='status',
uptime=1,
@ -88,13 +90,13 @@ NODE_STATS: typing.Final[list[proxmox.types.NodeStats]] = [
]
CLUSTER_INFO: typing.Final[proxmox.types.ClusterInfo] = proxmox.types.ClusterInfo(
cluster=proxmox.types.Cluster(name='name', version='version', id='id', nodes=2, quorate=1),
CLUSTER_INFO: typing.Final[prox_types.ClusterInfo] = prox_types.ClusterInfo(
cluster=prox_types.Cluster(name='name', version='version', id='id', nodes=2, quorate=1),
nodes=NODES,
)
STORAGES: typing.Final[list[proxmox.types.StorageInfo]] = [
proxmox.types.StorageInfo(
STORAGES: typing.Final[list[prox_types.StorageInfo]] = [
prox_types.StorageInfo(
node=NODES[i % len(NODES)].name,
storage=f'storage_{i}',
content=(f'content{i}',) * (i % 3),
@ -110,22 +112,22 @@ STORAGES: typing.Final[list[proxmox.types.StorageInfo]] = [
]
VGPUS: typing.Final[list[proxmox.types.VGPUInfo]] = [
proxmox.types.VGPUInfo(
VGPUS: typing.Final[list[prox_types.VGPUInfo]] = [
prox_types.VGPUInfo(
name='name_1',
description='description_1',
device='device_1',
available=True,
type='gpu_type_1',
),
proxmox.types.VGPUInfo(
prox_types.VGPUInfo(
name='name_2',
description='description_2',
device='device_2',
available=False,
type='gpu_type_2',
),
proxmox.types.VGPUInfo(
prox_types.VGPUInfo(
name='name_3',
description='description_3',
device='device_3',
@ -141,8 +143,8 @@ HA_GROUPS: typing.Final[list[str]] = [
'ha_group_4',
]
VMS_INFO: list[proxmox.types.VMInfo] = [
proxmox.types.VMInfo(
VMS_INFO: list[prox_types.VMInfo] = [
prox_types.VMInfo(
status='stopped',
vmid=i,
node=NODES[i % len(NODES)].name,
@ -168,8 +170,8 @@ VMS_INFO: list[proxmox.types.VMInfo] = [
for i in range(1, 16)
]
VMS_CONFIGURATION: typing.Final[list[proxmox.types.VMConfiguration]] = [
proxmox.types.VMConfiguration(
VMS_CONFIGURATION: typing.Final[list[prox_types.VMConfiguration]] = [
prox_types.VMConfiguration(
name=f'vm_name_{i}',
vga='cirrus',
sockets=1,
@ -177,7 +179,7 @@ VMS_CONFIGURATION: typing.Final[list[proxmox.types.VMConfiguration]] = [
vmgenid='vmgenid',
digest='digest',
networks=[
proxmox.types.NetworkConfiguration(
prox_types.NetworkConfiguration(
net='net', type='type', mac=f'{i:02x}:{i+1:02x}:{i+2:02x}:{i+3:02x}:{i+4:02x}:{i+5:02x}'
)
],
@ -188,7 +190,7 @@ VMS_CONFIGURATION: typing.Final[list[proxmox.types.VMConfiguration]] = [
]
UPID: typing.Final[proxmox.types.UPID] = proxmox.types.UPID(
UPID: typing.Final[prox_types.UPID] = prox_types.UPID(
node=NODES[0].name,
pid=1,
pstart=1,
@ -200,15 +202,15 @@ UPID: typing.Final[proxmox.types.UPID] = proxmox.types.UPID(
)
VM_CREATION_RESULT: typing.Final[proxmox.types.VmCreationResult] = proxmox.types.VmCreationResult(
VM_CREATION_RESULT: typing.Final[prox_types.VmCreationResult] = prox_types.VmCreationResult(
node=NODES[0].name,
vmid=VMS_INFO[0].vmid,
upid=UPID,
)
SNAPSHOTS_INFO: typing.Final[list[proxmox.types.SnapshotInfo]] = [
proxmox.types.SnapshotInfo(
SNAPSHOTS_INFO: typing.Final[list[prox_types.SnapshotInfo]] = [
prox_types.SnapshotInfo(
name=f'snap_name_{i}',
description=f'snap desription{i}',
parent=f'snap_parent_{i}',
@ -218,7 +220,7 @@ SNAPSHOTS_INFO: typing.Final[list[proxmox.types.SnapshotInfo]] = [
for i in range(10)
]
TASK_STATUS = proxmox.types.TaskStatus(
TASK_STATUS = prox_types.TaskStatus(
node=NODES[0].name,
pid=1,
pstart=1,
@ -231,8 +233,8 @@ TASK_STATUS = proxmox.types.TaskStatus(
id='id',
)
POOL_MEMBERS: typing.Final[list[proxmox.types.PoolMemberInfo]] = [
proxmox.types.PoolMemberInfo(
POOL_MEMBERS: typing.Final[list[prox_types.PoolMemberInfo]] = [
prox_types.PoolMemberInfo(
id=f'id_{i}',
node=NODES[i % len(NODES)].name,
storage=STORAGES[i % len(STORAGES)].storage,
@ -243,8 +245,8 @@ POOL_MEMBERS: typing.Final[list[proxmox.types.PoolMemberInfo]] = [
for i in range(10)
]
POOLS: typing.Final[list[proxmox.types.PoolInfo]] = [
proxmox.types.PoolInfo(
POOLS: typing.Final[list[prox_types.PoolInfo]] = [
prox_types.PoolInfo(
poolid=f'pool_{i}',
comments=f'comments_{i}',
members=POOL_MEMBERS,
@ -269,7 +271,7 @@ CONSOLE_CONNECTION_INFO: typing.Final[types.services.ConsoleConnectionInfo] = (
)
def replace_vm_info(vmid: int, **kwargs: typing.Any) -> proxmox.types.UPID:
def replace_vm_info(vmid: int, **kwargs: typing.Any) -> prox_types.UPID:
"""
Set the values of VMS_INFO[vmid - 1]
"""
@ -280,7 +282,7 @@ def replace_vm_info(vmid: int, **kwargs: typing.Any) -> proxmox.types.UPID:
return UPID
def replacer_vm_info(**kwargs: typing.Any) -> typing.Callable[..., proxmox.types.UPID]:
def replacer_vm_info(**kwargs: typing.Any) -> typing.Callable[..., prox_types.UPID]:
return functools.partial(replace_vm_info, **kwargs)
@ -288,87 +290,87 @@ def replacer_vm_info(**kwargs: typing.Any) -> typing.Callable[..., proxmox.types
CLIENT_METHODS_INFO: typing.Final[list[AutoSpecMethodInfo]] = [
# connect returns None
# Test method
AutoSpecMethodInfo(proxmox.ProxmoxClient.test, returns=True),
AutoSpecMethodInfo(uds.services.Proxmox.proxmox.client.ProxmoxClient.test, returns=True),
# get_cluster_info
AutoSpecMethodInfo(proxmox.ProxmoxClient.get_cluster_info, returns=CLUSTER_INFO),
AutoSpecMethodInfo(uds.services.Proxmox.proxmox.client.ProxmoxClient.get_cluster_info, returns=CLUSTER_INFO),
# get_next_vmid
AutoSpecMethodInfo(proxmox.ProxmoxClient.get_next_vmid, returns=1),
AutoSpecMethodInfo(uds.services.Proxmox.proxmox.client.ProxmoxClient.get_next_vmid, returns=1),
# is_vmid_available
AutoSpecMethodInfo(proxmox.ProxmoxClient.is_vmid_available, returns=True),
AutoSpecMethodInfo(uds.services.Proxmox.proxmox.client.ProxmoxClient.is_vmid_available, returns=True),
# get_node_networks, not called never (ensure it's not called by mistake)
# list_node_gpu_devices
AutoSpecMethodInfo(proxmox.ProxmoxClient.list_node_gpu_devices, returns=['gpu_dev_1', 'gpu_dev_2']),
AutoSpecMethodInfo(uds.services.Proxmox.proxmox.client.ProxmoxClient.list_node_gpu_devices, returns=['gpu_dev_1', 'gpu_dev_2']),
# list_node_vgpus
AutoSpecMethodInfo(proxmox.ProxmoxClient.list_node_vgpus, returns=VGPUS),
AutoSpecMethodInfo(uds.services.Proxmox.proxmox.client.ProxmoxClient.list_node_vgpus, returns=VGPUS),
# node_has_vgpus_available
AutoSpecMethodInfo(proxmox.ProxmoxClient.node_has_vgpus_available, returns=True),
AutoSpecMethodInfo(uds.services.Proxmox.proxmox.client.ProxmoxClient.node_has_vgpus_available, returns=True),
# get_best_node_for_machine
AutoSpecMethodInfo(proxmox.ProxmoxClient.get_best_node_for_machine, returns=NODE_STATS[0]),
AutoSpecMethodInfo(uds.services.Proxmox.proxmox.client.ProxmoxClient.get_best_node_for_machine, returns=NODE_STATS[0]),
# clone_machine
AutoSpecMethodInfo(proxmox.ProxmoxClient.clone_machine, returns=VM_CREATION_RESULT),
AutoSpecMethodInfo(uds.services.Proxmox.proxmox.client.ProxmoxClient.clone_machine, returns=VM_CREATION_RESULT),
# list_ha_groups
AutoSpecMethodInfo(proxmox.ProxmoxClient.list_ha_groups, returns=HA_GROUPS),
AutoSpecMethodInfo(uds.services.Proxmox.proxmox.client.ProxmoxClient.list_ha_groups, returns=HA_GROUPS),
# enable_machine_ha return None
# disable_machine_ha return None
# set_protection return None
# get_guest_ip_address
AutoSpecMethodInfo(proxmox.ProxmoxClient.get_guest_ip_address, returns=GUEST_IP_ADDRESS),
AutoSpecMethodInfo(uds.services.Proxmox.proxmox.client.ProxmoxClient.get_guest_ip_address, returns=GUEST_IP_ADDRESS),
# remove_machine
AutoSpecMethodInfo(proxmox.ProxmoxClient.remove_machine, returns=UPID),
AutoSpecMethodInfo(uds.services.Proxmox.proxmox.client.ProxmoxClient.remove_machine, returns=UPID),
# list_snapshots
AutoSpecMethodInfo(proxmox.ProxmoxClient.list_snapshots, returns=SNAPSHOTS_INFO),
AutoSpecMethodInfo(uds.services.Proxmox.proxmox.client.ProxmoxClient.list_snapshots, returns=SNAPSHOTS_INFO),
# supports_snapshot
AutoSpecMethodInfo(proxmox.ProxmoxClient.supports_snapshot, returns=True),
AutoSpecMethodInfo(uds.services.Proxmox.proxmox.client.ProxmoxClient.supports_snapshot, returns=True),
# create_snapshot
AutoSpecMethodInfo(proxmox.ProxmoxClient.create_snapshot, returns=UPID),
AutoSpecMethodInfo(uds.services.Proxmox.proxmox.client.ProxmoxClient.create_snapshot, returns=UPID),
# remove_snapshot
AutoSpecMethodInfo(proxmox.ProxmoxClient.remove_snapshot, returns=UPID),
AutoSpecMethodInfo(uds.services.Proxmox.proxmox.client.ProxmoxClient.remove_snapshot, returns=UPID),
# restore_snapshot
AutoSpecMethodInfo(proxmox.ProxmoxClient.restore_snapshot, returns=UPID),
AutoSpecMethodInfo(uds.services.Proxmox.proxmox.client.ProxmoxClient.restore_snapshot, returns=UPID),
# get_task
AutoSpecMethodInfo(proxmox.ProxmoxClient.get_task, returns=TASK_STATUS),
AutoSpecMethodInfo(uds.services.Proxmox.proxmox.client.ProxmoxClient.get_task, returns=TASK_STATUS),
# list_machines
AutoSpecMethodInfo(proxmox.ProxmoxClient.list_machines, returns=VMS_INFO),
AutoSpecMethodInfo(uds.services.Proxmox.proxmox.client.ProxmoxClient.list_machines, returns=VMS_INFO),
# get_machine_pool_info
AutoSpecMethodInfo(
proxmox.ProxmoxClient.get_machine_pool_info,
uds.services.Proxmox.proxmox.client.ProxmoxClient.get_machine_pool_info,
returns=lambda vmid, poolid, **kwargs: VMS_INFO[vmid - 1], # pyright: ignore
),
# get_machine_info
AutoSpecMethodInfo(
proxmox.ProxmoxClient.get_machine_info,
uds.services.Proxmox.proxmox.client.ProxmoxClient.get_machine_info,
returns=lambda vmid, *args, **kwargs: VMS_INFO[vmid - 1], # pyright: ignore
),
# get_machine_configuration
AutoSpecMethodInfo(
proxmox.ProxmoxClient.get_machine_configuration,
uds.services.Proxmox.proxmox.client.ProxmoxClient.get_machine_configuration,
returns=lambda vmid, **kwargs: VMS_CONFIGURATION[vmid - 1], # pyright: ignore
),
# enable_machine_ha return None
# start_machine
AutoSpecMethodInfo(proxmox.ProxmoxClient.start_machine, returns=replacer_vm_info(status='running')),
AutoSpecMethodInfo(uds.services.Proxmox.proxmox.client.ProxmoxClient.start_machine, returns=replacer_vm_info(status='running')),
# stop_machine
AutoSpecMethodInfo(proxmox.ProxmoxClient.stop_machine, returns=replacer_vm_info(status='stopped')),
AutoSpecMethodInfo(uds.services.Proxmox.proxmox.client.ProxmoxClient.stop_machine, returns=replacer_vm_info(status='stopped')),
# reset_machine
AutoSpecMethodInfo(proxmox.ProxmoxClient.reset_machine, returns=replacer_vm_info(status='stopped')),
AutoSpecMethodInfo(uds.services.Proxmox.proxmox.client.ProxmoxClient.reset_machine, returns=replacer_vm_info(status='stopped')),
# suspend_machine
AutoSpecMethodInfo(proxmox.ProxmoxClient.suspend_machine, returns=replacer_vm_info(status='suspended')),
AutoSpecMethodInfo(uds.services.Proxmox.proxmox.client.ProxmoxClient.suspend_machine, returns=replacer_vm_info(status='suspended')),
# resume_machine
AutoSpecMethodInfo(proxmox.ProxmoxClient.resume_machine, returns=replacer_vm_info(status='running')),
AutoSpecMethodInfo(uds.services.Proxmox.proxmox.client.ProxmoxClient.resume_machine, returns=replacer_vm_info(status='running')),
# shutdown_machine
AutoSpecMethodInfo(proxmox.ProxmoxClient.shutdown_machine, returns=replacer_vm_info(status='stopped')),
AutoSpecMethodInfo(uds.services.Proxmox.proxmox.client.ProxmoxClient.shutdown_machine, returns=replacer_vm_info(status='stopped')),
# convert_to_template
AutoSpecMethodInfo(proxmox.ProxmoxClient.convert_to_template, returns=replacer_vm_info(template=True)),
AutoSpecMethodInfo(uds.services.Proxmox.proxmox.client.ProxmoxClient.convert_to_template, returns=replacer_vm_info(template=True)),
# get_storage
AutoSpecMethodInfo(
proxmox.ProxmoxClient.get_storage,
uds.services.Proxmox.proxmox.client.ProxmoxClient.get_storage,
returns=lambda storage, node, **kwargs: next( # pyright: ignore
filter(lambda s: s.storage == storage, STORAGES) # pyright: ignore
),
),
# list_storages
AutoSpecMethodInfo(
proxmox.ProxmoxClient.list_storages,
uds.services.Proxmox.proxmox.client.ProxmoxClient.list_storages,
returns=lambda node, **kwargs: ( # pyright: ignore
(list(filter(lambda s: s.node == node, STORAGES))) # pyright: ignore
if node is not None
@ -377,24 +379,24 @@ CLIENT_METHODS_INFO: typing.Final[list[AutoSpecMethodInfo]] = [
),
# get_node_stats
AutoSpecMethodInfo(
proxmox.ProxmoxClient.get_node_stats,
uds.services.Proxmox.proxmox.client.ProxmoxClient.get_node_stats,
returns=lambda node, **kwargs: next( # pyright: ignore
filter(lambda n: n.name == node, NODE_STATS) # pyright: ignore
),
),
# list_pools
AutoSpecMethodInfo(proxmox.ProxmoxClient.list_pools, returns=POOLS),
AutoSpecMethodInfo(uds.services.Proxmox.proxmox.client.ProxmoxClient.list_pools, returns=POOLS),
# get_pool_info
AutoSpecMethodInfo(
proxmox.ProxmoxClient.get_pool_info,
uds.services.Proxmox.proxmox.client.ProxmoxClient.get_pool_info,
returns=lambda poolid, **kwargs: next( # pyright: ignore
filter(lambda p: p.poolid == poolid, POOLS) # pyright: ignore
),
),
# get_console_connection
AutoSpecMethodInfo(proxmox.ProxmoxClient.get_console_connection, returns=CONSOLE_CONNECTION_INFO),
AutoSpecMethodInfo(uds.services.Proxmox.proxmox.client.ProxmoxClient.get_console_connection, returns=CONSOLE_CONNECTION_INFO),
# journal
AutoSpecMethodInfo(proxmox.ProxmoxClient.journal, returns=['journal line 1', 'journal line 2']),
AutoSpecMethodInfo(uds.services.Proxmox.proxmox.client.ProxmoxClient.journal, returns=['journal line 1', 'journal line 2']),
]
PROVIDER_VALUES_DICT: typing.Final[gui.ValuesDictType] = {
@ -436,7 +438,7 @@ def create_client_mock() -> mock.Mock:
"""
Create a mock of ProxmoxClient
"""
return autospec(proxmox.ProxmoxClient, CLIENT_METHODS_INFO)
return autospec(uds.services.Proxmox.proxmox.client.ProxmoxClient, CLIENT_METHODS_INFO)
@contextlib.contextmanager

View File

@ -37,7 +37,7 @@ from uds.core import types
from uds.core.services.generics.fixed.userservice import FixedUserService, Operation
from uds.core.util import autoserializable
from . import proxmox
from .proxmox import types as prox_types, exceptions as prox_exceptions
# Not imported at runtime, just for type checking
if typing.TYPE_CHECKING:
@ -61,7 +61,7 @@ class ProxmoxUserServiceFixed(FixedUserService, autoserializable.AutoSerializabl
# : Recheck every ten seconds by default (for task methods)
suggested_delay = 4
def _store_task(self, upid: 'proxmox.types.UPID') -> None:
def _store_task(self, upid: 'prox_types.UPID') -> None:
self._task = '\t'.join([upid.node, upid.upid])
def _retrieve_task(self) -> tuple[str, str]:
@ -78,7 +78,7 @@ class ProxmoxUserServiceFixed(FixedUserService, autoserializable.AutoSerializabl
try:
vminfo = self.service().get_machine_info(int(self._vmid))
except proxmox.ProxmoxConnectionError:
except prox_exceptions.ProxmoxConnectionError:
raise # If connection fails, let it fail on parent
except Exception as e:
return self.error(f'Machine not found: {e}')
@ -105,7 +105,7 @@ class ProxmoxUserServiceFixed(FixedUserService, autoserializable.AutoSerializabl
def op_start(self) -> None:
try:
vminfo = self.service().get_machine_info(int(self._vmid))
except proxmox.ProxmoxConnectionError:
except prox_exceptions.ProxmoxConnectionError:
self.retry_later()
return
except Exception as e:
@ -123,7 +123,7 @@ class ProxmoxUserServiceFixed(FixedUserService, autoserializable.AutoSerializabl
try:
task = self.service().provider().get_task_info(node, upid)
except proxmox.ProxmoxConnectionError:
except prox_exceptions.ProxmoxConnectionError:
return types.states.TaskState.RUNNING # Try again later
if task.is_errored():

View File

@ -39,8 +39,9 @@ from uds.core import types, consts
from uds.core.services.generics.dynamic.userservice import DynamicUserService
from uds.core.managers.userservice import UserServiceManager
from uds.core.util import autoserializable
import uds.services.Proxmox.proxmox.exceptions
from . import proxmox
from .proxmox import types as prox_types
# Not imported at runtime, just for type checking
@ -117,7 +118,7 @@ class ProxmoxUserserviceLinked(DynamicUserService):
_task = autoserializable.StringField(default='')
def _store_task(self, upid: 'proxmox.types.UPID') -> None:
def _store_task(self, upid: 'prox_types.UPID') -> None:
self._task = ','.join([upid.node, upid.upid])
def _retrieve_task(self) -> tuple[str, str]:
@ -132,7 +133,7 @@ class ProxmoxUserserviceLinked(DynamicUserService):
try:
task = self.service().provider().get_task_info(node, upid)
except proxmox.ProxmoxConnectionError:
except uds.services.Proxmox.proxmox.exceptions.ProxmoxConnectionError:
return types.states.TaskState.RUNNING # Try again later
if task.is_errored():
@ -204,7 +205,7 @@ class ProxmoxUserserviceLinked(DynamicUserService):
# Set vm mac address now on first interface
self.service().provider().set_machine_mac(int(self._vmid), self.get_unique_id())
except proxmox.ProxmoxConnectionError:
except uds.services.Proxmox.proxmox.exceptions.ProxmoxConnectionError:
self.retry_later() # Push nop to front of queue, so it is consumed instead of this one
return
except Exception as e:

View File

@ -36,9 +36,10 @@ from uds.core import jobs
from uds.models import Provider
from uds.core.util.model import sql_stamp_seconds
from uds.core.util.unique_id_generator import UniqueIDGenerator
import uds.services.Proxmox.proxmox.exceptions
from . import provider
from . import proxmox
from .proxmox import types as prox_types
# Note that even reseting, UDS will get always a FREE vmid, so even if the machine is already in use
# (and removed from used db), it will not be reused until it has dissapeared from the proxmox server
@ -104,7 +105,7 @@ class ProxmoxDeferredRemoval(jobs.Job):
@staticmethod
def waitForTaskFinish(
providerInstance: 'provider.ProxmoxProvider',
upid: 'proxmox.types.UPID',
upid: 'prox_types.UPID',
maxWait: int = 30, # 30 * 0.3 = 9 seconds
) -> bool:
counter = 0
@ -146,7 +147,7 @@ class ProxmoxDeferredRemoval(jobs.Job):
# It this is reached, remove check
storage.remove('tr' + str(vmid))
except proxmox.ProxmoxNotFound:
except uds.services.Proxmox.proxmox.exceptions.ProxmoxNotFound:
storage.remove('tr' + str(vmid)) # VM does not exists anymore
except Exception as e: # Any other exception wil be threated again
# instance.log('Delayed removal of %s has failed: %s. Will retry later', vmId, e)

View File

@ -38,7 +38,7 @@ from uds.core.util import validators, fields
from uds.core.util.decorators import cached
from uds.core.util.unique_id_generator import UniqueIDGenerator
from . import proxmox
from .proxmox import client, types as prox_types, exceptions as prox_exceptions
from .service_linked import ProxmoxServiceLinked
from .service_fixed import ProxmoxServiceFixed
@ -119,15 +119,15 @@ class ProxmoxProvider(services.ServiceProvider):
macs_range = fields.macs_range_field(default='52:54:00:00:00:00-52:54:00:FF:FF:FF')
# Own variables
_cached_api: typing.Optional[proxmox.ProxmoxClient] = None
_cached_api: typing.Optional[client.ProxmoxClient] = None
_vmid_generator: UniqueIDGenerator
def _api(self) -> proxmox.ProxmoxClient:
def _api(self) -> client.ProxmoxClient:
"""
Returns the connection API object
"""
if self._cached_api is None:
self._cached_api = proxmox.ProxmoxClient(
self._cached_api = client.ProxmoxClient(
self.host.value,
self.port.as_int(),
self.username.value,
@ -166,29 +166,29 @@ class ProxmoxProvider(services.ServiceProvider):
return self._api().test()
def list_machines(self, force: bool = False) -> list[proxmox.types.VMInfo]:
def list_machines(self, force: bool = False) -> list[prox_types.VMInfo]:
return self._api().list_machines(force=force)
def get_machine_info(self, vmid: int, poolid: typing.Optional[str] = None) -> proxmox.types.VMInfo:
def get_machine_info(self, vmid: int, poolid: typing.Optional[str] = None) -> prox_types.VMInfo:
return self._api().get_machine_pool_info(vmid, poolid, force=True)
def get_machine_configuration(self, vmid: int) -> proxmox.types.VMConfiguration:
def get_machine_configuration(self, vmid: int) -> prox_types.VMConfiguration:
return self._api().get_machine_configuration(vmid, force=True)
def get_storage_info(self, storageid: str, node: str, force: bool = False) -> proxmox.types.StorageInfo:
def get_storage_info(self, storageid: str, node: str, force: bool = False) -> prox_types.StorageInfo:
return self._api().get_storage(storageid, node, force=force)
def list_storages(
self, node: typing.Optional[str] = None, force: bool = False
) -> list[proxmox.types.StorageInfo]:
) -> list[prox_types.StorageInfo]:
return self._api().list_storages(node=node, content='images', force=force)
def list_pools(self, force: bool = False) -> list[proxmox.types.PoolInfo]:
def list_pools(self, force: bool = False) -> list[prox_types.PoolInfo]:
return self._api().list_pools(force=force)
def get_pool_info(
self, pool_id: str, retrieve_vm_names: bool = False, force: bool = False
) -> proxmox.types.PoolInfo:
) -> prox_types.PoolInfo:
return self._api().get_pool_info(pool_id, retrieve_vm_names=retrieve_vm_names, force=force)
def create_template(self, vmid: int) -> None:
@ -204,7 +204,7 @@ class ProxmoxProvider(services.ServiceProvider):
target_storage: typing.Optional[str] = None,
target_pool: typing.Optional[str] = None,
must_have_vgpus: typing.Optional[bool] = None,
) -> proxmox.types.VmCreationResult:
) -> prox_types.VmCreationResult:
return self._api().clone_machine(
vmid,
self.get_new_vmid(),
@ -217,25 +217,25 @@ class ProxmoxProvider(services.ServiceProvider):
must_have_vgpus,
)
def start_machine(self, vmid: int) -> proxmox.types.UPID:
def start_machine(self, vmid: int) -> prox_types.UPID:
return self._api().start_machine(vmid)
def stop_machine(self, vmid: int) -> proxmox.types.UPID:
def stop_machine(self, vmid: int) -> prox_types.UPID:
return self._api().stop_machine(vmid)
def reset_machine(self, vmid: int) -> proxmox.types.UPID:
def reset_machine(self, vmid: int) -> prox_types.UPID:
return self._api().reset_machine(vmid)
def suspend_machine(self, vmId: int) -> proxmox.types.UPID:
def suspend_machine(self, vmId: int) -> prox_types.UPID:
return self._api().suspend_machine(vmId)
def shutdown_machine(self, vmid: int) -> proxmox.types.UPID:
def shutdown_machine(self, vmid: int) -> prox_types.UPID:
return self._api().shutdown_machine(vmid)
def remove_machine(self, vmid: int) -> proxmox.types.UPID:
def remove_machine(self, vmid: int) -> prox_types.UPID:
return self._api().remove_machine(vmid)
def get_task_info(self, node: str, upid: str) -> proxmox.types.TaskStatus:
def get_task_info(self, node: str, upid: str) -> prox_types.TaskStatus:
return self._api().get_task(node, upid)
def enable_machine_ha(self, vmid: int, started: bool = False, group: typing.Optional[str] = None) -> None:
@ -270,7 +270,7 @@ class ProxmoxProvider(services.ServiceProvider):
# All assigned vmid will be left as unusable on UDS until released by time (3 years)
# This is not a problem at all, in the rare case that a machine id is released from uds db
# if it exists when we try to create a new one, we will simply try to get another one
raise proxmox.ProxmoxError(f'Could not get a new vmid!!: last tried {vmid}')
raise prox_exceptions.ProxmoxError(f'Could not get a new vmid!!: last tried {vmid}')
def get_guest_ip_address(self, vmid: int, node: typing.Optional[str] = None, ip_version: typing.Literal['4', '6', ''] = '') -> str:
return self._api().get_guest_ip_address(vmid, node, ip_version)
@ -280,7 +280,7 @@ class ProxmoxProvider(services.ServiceProvider):
def get_current_snapshot(
self, vmid: int, node: typing.Optional[str] = None
) -> typing.Optional[proxmox.types.SnapshotInfo]:
) -> typing.Optional[prox_types.SnapshotInfo]:
return (
sorted(
filter(lambda x: x.snaptime, self._api().list_snapshots(vmid, node)),
@ -296,12 +296,12 @@ class ProxmoxProvider(services.ServiceProvider):
node: typing.Optional[str] = None,
name: typing.Optional[str] = None,
description: typing.Optional[str] = None,
) -> proxmox.types.UPID:
) -> prox_types.UPID:
return self._api().create_snapshot(vmid, node, name, description)
def restore_snapshot(
self, vmid: int, node: typing.Optional[str] = None, name: typing.Optional[str] = None
) -> proxmox.types.UPID:
) -> prox_types.UPID:
"""
In fact snapshot is not optional, but node is and want to keep the same signature as the api
"""

View File

@ -1,5 +1,3 @@
# -*- coding: utf-8 -*-
#
# Copyright (c) 2019-2021 Virtual Cable S.L.U.
# All rights reserved.
@ -27,788 +25,6 @@
# OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE
# OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
"""
.. moduleauthor:: Adolfo Gómez, dkmaster at dkmon dot com
Author: Adolfo Gómez, dkmaster at dkmon dot com
"""
import collections.abc
import logging
import re
import time
import typing
import urllib.parse
import requests
from uds.core import consts, types as core_types
from uds.core.util import security
from uds.core.util.decorators import cached, ensure_connected
from uds.core.services.generics import exceptions
from . import types
# DEFAULT_PORT = 8006
CACHE_DURATION: typing.Final[int] = consts.cache.DEFAULT_CACHE_TIMEOUT
CACHE_INFO_DURATION: typing.Final[int] = consts.cache.SHORT_CACHE_TIMEOUT
# Cache duration is 3 minutes, so this is 60 mins * 24 = 1 day (default)
CACHE_DURATION_LONG: typing.Final[int] = consts.cache.EXTREME_CACHE_TIMEOUT
# Not imported at runtime, just for type checking
if typing.TYPE_CHECKING:
from uds.core.util.cache import Cache
logger = logging.getLogger(__name__)
class ProxmoxError(exceptions.Error):
pass
class ProxmoxConnectionError(exceptions.RetryableError):
pass
class ProxmoxAuthError(exceptions.FatalError):
pass
class ProxmoxNotFound(exceptions.NotFoundError):
pass
class ProxmoxNodeUnavailableError(ProxmoxConnectionError):
pass
class ProxmoxNoGPUError(ProxmoxError):
pass
# caching helper
def caching_key_helper(obj: 'ProxmoxClient') -> str:
return obj._host # pylint: disable=protected-access
class ProxmoxClient:
_host: str
_port: int
_credentials: tuple[tuple[str, str], tuple[str, str]]
_url: str
_validate_cert: bool
_timeout: int
_ticket: str
_csrf: str
cache: typing.Optional['Cache']
def __init__(
self,
host: str,
port: int,
username: str,
password: str,
timeout: int = 5,
validate_certificate: bool = False,
cache: typing.Optional['Cache'] = None,
) -> None:
self._host = host
self._port = port
self._credentials = (('username', username), ('password', password))
self._validate_cert = validate_certificate
self._timeout = timeout
self._url = 'https://{}:{}/api2/json/'.format(self._host, self._port)
self.cache = cache
self._ticket = ''
self._csrf = ''
@property
def headers(self) -> dict[str, str]:
return {
'Accept': 'application/json',
'Content-Type': 'application/x-www-form-urlencoded',
'CSRFPreventionToken': self._csrf,
}
def ensure_correct(self, response: 'requests.Response', *, node: typing.Optional[str]) -> typing.Any:
if not response.ok:
logger.debug('Error on request %s: %s', response.status_code, response.content)
error_message = 'Status code {}'.format(response.status_code)
if response.status_code == 595:
raise ProxmoxNodeUnavailableError(response.content.decode('utf8'))
if response.status_code == 403:
raise ProxmoxAuthError(response.content.decode('utf8'))
if response.status_code == 400:
try:
error_message = 'Errors on request: {}'.format(response.json()['errors'])
except Exception: # nosec: No joson or no errors, use default msg
pass
if response.status_code == 500 and node:
# Try to get from journal
try:
journal = [x for x in filter(lambda x: 'failed' in x, self.journal(node, 4))]
logger.error('Proxmox error 500:')
for line in journal:
logger.error(' * %s', line)
error_message = f'Error 500 on request: {" ## ".join(journal)}'
except Exception:
pass # If we can't get journal, just use default message
raise ProxmoxError(error_message)
return response.json()
def _compose_url_for(self, path: str) -> str:
return self._url + path
def _get(self, path: str, *, node: typing.Optional[str] = None) -> typing.Any:
try:
result = security.secure_requests_session(verify=self._validate_cert).get(
self._compose_url_for(path),
headers=self.headers,
cookies={'PVEAuthCookie': self._ticket},
timeout=self._timeout,
)
logger.debug('GET result to %s: %s -- %s', path, result.status_code, result.content)
except requests.ConnectionError as e:
raise ProxmoxConnectionError(str(e))
return self.ensure_correct(result, node=node)
def _post(
self,
path: str,
data: typing.Optional[collections.abc.Iterable[tuple[str, str]]] = None,
*,
node: typing.Optional[str] = None,
) -> typing.Any:
try:
result = security.secure_requests_session(verify=self._validate_cert).post(
self._compose_url_for(path),
data=data, # type: ignore
headers=self.headers,
cookies={'PVEAuthCookie': self._ticket},
timeout=self._timeout,
)
logger.debug('POST result to %s: %s -- %s', path, result.status_code, result.content)
except requests.ConnectionError as e:
raise ProxmoxConnectionError(str(e))
return self.ensure_correct(result, node=node)
def _delete(
self,
path: str,
data: typing.Optional[collections.abc.Iterable[tuple[str, str]]] = None,
*,
node: typing.Optional[str] = None,
) -> typing.Any:
try:
result = security.secure_requests_session(verify=self._validate_cert).delete(
self._compose_url_for(path),
data=data, # type: ignore
headers=self.headers,
cookies={'PVEAuthCookie': self._ticket},
timeout=self._timeout,
)
logger.debug(
'DELETE result to %s: %s -- %s -- %s',
path,
result.status_code,
result.content,
result.headers,
)
except requests.ConnectionError as e:
raise ProxmoxConnectionError(str(e))
return self.ensure_correct(result, node=node)
def connect(self, force: bool = False) -> None:
if self._ticket:
return # Already connected
# we could cache this for a while, we know that at least for 30 minutes
if self.cache and not force:
dc = self.cache.get(self._host + 'conn')
if dc: # Stored on cache
self._ticket, self._csrf = dc
return
try:
result = security.secure_requests_session(verify=self._validate_cert).post(
url=self._compose_url_for('access/ticket'),
data=self._credentials,
headers=self.headers,
timeout=self._timeout,
)
if not result.ok:
raise ProxmoxAuthError(result.content.decode('utf8'))
data = result.json()['data']
self._ticket = data['ticket']
self._csrf = data['CSRFPreventionToken']
if self.cache:
self.cache.put(self._host + 'conn', (self._ticket, self._csrf), validity=1800) # 30 minutes
except requests.RequestException as e:
raise ProxmoxConnectionError(str(e)) from e
def test(self) -> bool:
try:
self.connect()
except Exception:
# logger.error('Error testing proxmox: %s', e)
return False
return True
@ensure_connected
@cached('cluster', CACHE_DURATION, key_helper=caching_key_helper)
def get_cluster_info(self, **kwargs: typing.Any) -> types.ClusterInfo:
return types.ClusterInfo.from_dict(self._get('cluster/status'))
@ensure_connected
def get_next_vmid(self) -> int:
return int(self._get('cluster/nextid')['data'])
@ensure_connected
def is_vmid_available(self, vmid: int) -> bool:
try:
self._get(f'cluster/nextid?vmid={vmid}')
except Exception: # Not available
return False
return True
@ensure_connected
@cached('nodeNets', CACHE_DURATION, args=1, kwargs=['node'], key_helper=caching_key_helper)
def get_node_networks(self, node: str, **kwargs: typing.Any) -> typing.Any:
return self._get(f'nodes/{node}/network', node=node)['data']
# pylint: disable=unused-argument
@ensure_connected
@cached('nodeGpuDevices', CACHE_DURATION_LONG, key_helper=caching_key_helper)
def list_node_gpu_devices(self, node: str, **kwargs: typing.Any) -> list[str]:
return [
device['id']
for device in self._get(f'nodes/{node}/hardware/pci', node=node)['data']
if device.get('mdev')
]
@ensure_connected
def list_node_vgpus(self, node: str, **kwargs: typing.Any) -> list[types.VGPUInfo]:
return [
types.VGPUInfo.from_dict(gpu)
for device in self.list_node_gpu_devices(node)
for gpu in self._get(f'nodes/{node}/hardware/pci/{device}/mdev', node=node)['data']
]
@ensure_connected
def node_has_vgpus_available(
self, node: str, vgpu_type: typing.Optional[str], **kwargs: typing.Any
) -> bool:
return any(
gpu.available and (vgpu_type is None or gpu.type == vgpu_type) for gpu in self.list_node_vgpus(node)
)
@ensure_connected
def get_best_node_for_machine(
self,
min_memory: int = 0,
must_have_vgpus: typing.Optional[bool] = None,
mdev_type: typing.Optional[str] = None,
) -> typing.Optional[types.NodeStats]:
'''
Returns the best node to create a VM on
Args:
minMemory (int, optional): Minimum memory required. Defaults to 0.
mustHaveVGPUS (typing.Optional[bool], optional): If the node must have VGPUS. True, False or None (don't care). Defaults to None.
'''
best = types.NodeStats.null()
node: types.NodeStats
# Function to calculate the weight of a node
def calc_weight(x: types.NodeStats) -> float:
return (x.mem / x.maxmem) + (x.cpu / x.maxcpu) * 1.3
# Offline nodes are not "the best"
for node in filter(lambda x: x.status == 'online', self.get_node_stats()):
if min_memory and node.mem < min_memory + 512000000: # 512 MB reserved
continue # Skips nodes with not enouhg memory
if must_have_vgpus is not None and must_have_vgpus != bool(self.list_node_gpu_devices(node.name)):
continue # Skips nodes without VGPUS if vGPUS are required
if mdev_type and not self.node_has_vgpus_available(node.name, mdev_type):
continue # Skips nodes without free vGPUS of required type if a type is required
# Get best node using our simple weight function (basically, the less used node, but with a little more weight on CPU)
if calc_weight(node) < calc_weight(best):
best = node
# logger.debug('Node values for best: %s %f %f', node.name, node.mem / node.maxmem * 100, node.cpu)
return best if best.status == 'online' else None
@ensure_connected
def clone_machine(
self,
vmid: int,
new_vmid: int,
name: str,
description: typing.Optional[str],
as_linked_clone: bool,
use_node: typing.Optional[str] = None,
use_storage: typing.Optional[str] = None,
use_pool: typing.Optional[str] = None,
must_have_vgpus: typing.Optional[bool] = None,
) -> types.VmCreationResult:
vmInfo = self.get_machine_info(vmid)
src_node = vmInfo.node
if not use_node:
logger.debug('Selecting best node')
# If storage is not shared, must be done on same as origin
if use_storage and self.get_storage(use_storage, vmInfo.node).shared:
node = self.get_best_node_for_machine(
min_memory=-1, must_have_vgpus=must_have_vgpus, mdev_type=vmInfo.vgpu_type
)
if node is None:
raise ProxmoxError(
f'No switable node available for new vm {name} on Proxmox (check memory and VGPUS, space...)'
)
use_node = node.name
else:
use_node = src_node
# Check if mustHaveVGPUS is compatible with the node
if must_have_vgpus is not None and must_have_vgpus != bool(self.list_node_gpu_devices(use_node)):
raise ProxmoxNoGPUError(f'Node "{use_node}" does not have VGPUS and they are required')
if self.node_has_vgpus_available(use_node, vmInfo.vgpu_type):
raise ProxmoxNoGPUError(
f'Node "{use_node}" does not have free VGPUS of type {vmInfo.vgpu_type} (requred by VM {vmInfo.name})'
)
# From normal vm, disable "linked cloning"
if as_linked_clone and not vmInfo.template:
as_linked_clone = False
params: list[tuple[str, str]] = [
('newid', str(new_vmid)),
('name', name),
('target', use_node),
('full', str(int(not as_linked_clone))),
]
if description:
params.append(('description', description))
if use_storage and as_linked_clone is False:
params.append(('storage', use_storage))
if use_pool:
params.append(('pool', use_pool))
if as_linked_clone is False:
params.append(('format', 'qcow2')) # Ensure clone for templates is on qcow2 format
logger.debug('PARAMS: %s', params)
return types.VmCreationResult(
node=use_node,
vmid=new_vmid,
upid=types.UPID.from_dict(
self._post(f'nodes/{src_node}/qemu/{vmid}/clone', data=params, node=src_node)
),
)
@ensure_connected
@cached('hagrps', CACHE_DURATION, key_helper=caching_key_helper)
def list_ha_groups(self, **kwargs: typing.Any) -> list[str]:
return [g['group'] for g in self._get('cluster/ha/groups')['data']]
@ensure_connected
def enable_machine_ha(self, vmid: int, started: bool = False, group: typing.Optional[str] = None) -> None:
"""
Enable high availability for a virtual machine.
Args:
vmid (int): The ID of the virtual machine.
started (bool, optional): Whether the virtual machine should be started. Defaults to False.
group (str, optional): The group to which the virtual machine belongs. Defaults to None.
"""
self._post(
'cluster/ha/resources',
data=[
('sid', f'vm:{vmid}'),
('comment', 'UDS HA VM'),
('state', 'started' if started else 'stopped'),
('max_restart', '4'),
('max_relocate', '4'),
]
+ ([('group', group)] if group else []), # Append ha group if present
)
@ensure_connected
def disable_machine_ha(self, vmid: int) -> None:
try:
self._delete(f'cluster/ha/resources/vm%3A{vmid}')
except Exception:
logger.exception('removeFromHA')
@ensure_connected
def set_protection(self, vmid: int, node: typing.Optional[str] = None, protection: bool = False) -> None:
params: list[tuple[str, str]] = [
('protection', str(int(protection))),
]
node = node or self.get_machine_info(vmid).node
self._post(f'nodes/{node}/qemu/{vmid}/config', data=params, node=node)
@ensure_connected
def get_guest_ip_address(
self, vmid: int, node: typing.Optional[str], ip_version: typing.Literal['4', '6', ''] = ''
) -> str:
"""Returns the guest ip address of the specified machine"""
try:
node = node or self.get_machine_info(vmid).node
ifaces_list: list[dict[str, typing.Any]] = self._get(
f'nodes/{node}/qemu/{vmid}/agent/network-get-interfaces',
node=node,
)['data']['result']
# look for first non-localhost interface with an ip address
for iface in ifaces_list:
if iface['name'] != 'lo' and 'ip-addresses' in iface:
for ip in iface['ip-addresses']:
if ip['ip-address'].startswith('127.'):
continue
if ip_version == '4' and ip.get('ip-address-type') != 'ipv4':
continue
elif ip_version == '6' and ip.get('ip-address-type') != 'ipv6':
continue
return ip['ip-address']
except Exception as e:
logger.info('Error getting guest ip address for machine %s: %s', vmid, e)
raise ProxmoxError(f'No ip address for vm {vmid}: {e}')
raise ProxmoxError('No ip address found for vm {}'.format(vmid))
@ensure_connected
def remove_machine(self, vmid: int, node: typing.Optional[str] = None, purge: bool = True) -> types.UPID:
node = node or self.get_machine_info(vmid).node
return types.UPID.from_dict(self._delete(f'nodes/{node}/qemu/{vmid}?purge=1', node=node))
@ensure_connected
def list_snapshots(self, vmid: int, node: typing.Optional[str] = None) -> list[types.SnapshotInfo]:
node = node or self.get_machine_info(vmid).node
try:
return [
types.SnapshotInfo.from_dict(s)
for s in self._get(f'nodes/{node}/qemu/{vmid}/snapshot', node=node)['data']
]
except Exception:
return [] # If we can't get snapshots, just return empty list
@ensure_connected
@cached('snapshots', CACHE_DURATION, key_helper=caching_key_helper)
def supports_snapshot(self, vmid: int, node: typing.Optional[str] = None) -> bool:
# If machine uses tpm, snapshots are not supported
return not self.get_machine_configuration(vmid, node).tpmstate0
@ensure_connected
def create_snapshot(
self,
vmid: int,
node: 'str|None' = None,
name: typing.Optional[str] = None,
description: typing.Optional[str] = None,
) -> types.UPID:
if self.supports_snapshot(vmid, node) is False:
raise ProxmoxError('Machine does not support snapshots')
node = node or self.get_machine_info(vmid).node
# Compose a sanitized name, without spaces and with a timestamp
name = name or f'UDS-{time.time()}'
params: list[tuple[str, str]] = [
('snapname', name),
('description', description or f'UDS Snapshot created at {time.strftime("%c")}'),
]
params.append(('snapname', name or ''))
return types.UPID.from_dict(self._post(f'nodes/{node}/qemu/{vmid}/snapshot', data=params, node=node))
@ensure_connected
def remove_snapshot(
self, vmid: int, node: 'str|None' = None, name: typing.Optional[str] = None
) -> types.UPID:
node = node or self.get_machine_info(vmid).node
if name is None:
raise ProxmoxError('Snapshot name is required')
return types.UPID.from_dict(self._delete(f'nodes/{node}/qemu/{vmid}/snapshot/{name}', node=node))
@ensure_connected
def restore_snapshot(
self, vmid: int, node: 'str|None' = None, name: typing.Optional[str] = None
) -> types.UPID:
node = node or self.get_machine_info(vmid).node
if name is None:
raise ProxmoxError('Snapshot name is required')
return types.UPID.from_dict(self._post(f'nodes/{node}/qemu/{vmid}/snapshot/{name}/rollback', node=node))
@ensure_connected
def get_task(self, node: str, upid: str) -> types.TaskStatus:
return types.TaskStatus.from_dict(
self._get(f'nodes/{node}/tasks/{urllib.parse.quote(upid)}/status', node=node)
)
@cached('vms', CACHE_DURATION, key_helper=caching_key_helper)
@ensure_connected
def list_machines(
self, node: typing.Union[None, str, collections.abc.Iterable[str]] = None, **kwargs: typing.Any
) -> list[types.VMInfo]:
node_list: collections.abc.Iterable[str]
if node is None:
node_list = [n.name for n in self.get_cluster_info().nodes if n.online]
elif isinstance(node, str):
node_list = [node]
else:
node_list = node
result: list[types.VMInfo] = []
for node_name in node_list:
for vm in self._get(f'nodes/{node_name}/qemu', node=node_name)['data']:
vm['node'] = node_name
result.append(types.VMInfo.from_dict(vm))
return sorted(result, key=lambda x: '{}{}'.format(x.node, x.name))
@cached('vmip', CACHE_INFO_DURATION, key_helper=caching_key_helper)
@ensure_connected
def get_machine_pool_info(
self, vmid: int, poolid: typing.Optional[str], **kwargs: typing.Any
) -> types.VMInfo:
# try to locate machine in pool
node = None
if poolid:
try:
for i in self._get(f'pools/{poolid}', node=node)['data']['members']:
try:
if i['vmid'] == vmid:
node = i['node']
break
except Exception: # nosec: # If vmid is not present, just try next node
pass
except Exception: # nosec: # If pool is not present, just use default getVmInfo
pass
return self.get_machine_info(vmid, node, **kwargs)
@ensure_connected
@cached('vmin', CACHE_INFO_DURATION, key_helper=caching_key_helper)
def get_machine_info(
self, vmid: int, node: typing.Optional[str] = None, **kwargs: typing.Any
) -> types.VMInfo:
nodes = [types.Node(node, False, False, 0, '', '', '')] if node else self.get_cluster_info().nodes
any_node_is_down = False
for n in nodes:
try:
vm = self._get(f'nodes/{n.name}/qemu/{vmid}/status/current', node=node)['data']
vm['node'] = n.name
return types.VMInfo.from_dict(vm)
except ProxmoxConnectionError:
any_node_is_down = True # There is at least one node down when we are trying to get info
except ProxmoxAuthError:
raise
except ProxmoxError:
pass # Any other error, ignore this node (not found in that node)
if any_node_is_down:
raise ProxmoxNodeUnavailableError('All nodes are down or not available')
raise ProxmoxNotFound(f'VM {vmid} not found')
@ensure_connected
def get_machine_configuration(
self, vmid: int, node: typing.Optional[str] = None, **kwargs: typing.Any
) -> types.VMConfiguration:
node = node or self.get_machine_info(vmid).node
return types.VMConfiguration.from_dict(self._get(f'nodes/{node}/qemu/{vmid}/config', node=node)['data'])
@ensure_connected
def set_machine_mac(
self,
vmid: int,
mac: str,
netid: typing.Optional[str] = None,
node: typing.Optional[str] = None,
) -> None:
node = node or self.get_machine_info(vmid).node
# First, read current configuration and extract network configuration
config = self._get(f'nodes/{node}/qemu/{vmid}/config', node=node)['data']
if netid not in config:
# Get first network interface (netX where X is a number)
netid = next((k for k in config if k.startswith('net') and k[3:].isdigit()), None)
if not netid:
raise ProxmoxError('No network interface found')
netdata = config[netid]
# Update mac address, that is the first field <model>=<mac>,<other options>
netdata = re.sub(r'^([^=]+)=([^,]+),', r'\1={},'.format(mac), netdata)
logger.debug('Updating mac address for VM %s: %s=%s', vmid, netid, netdata)
self._post(
f'nodes/{node}/qemu/{vmid}/config',
data=[(netid, netdata)],
node=node,
)
@ensure_connected
def start_machine(self, vmid: int, node: typing.Optional[str] = None) -> types.UPID:
# if exitstatus is "OK" or contains "already running", all is fine
node = node or self.get_machine_info(vmid).node
return types.UPID.from_dict(self._post(f'nodes/{node}/qemu/{vmid}/status/start', node=node))
@ensure_connected
def stop_machine(self, vmid: int, node: typing.Optional[str] = None) -> types.UPID:
node = node or self.get_machine_info(vmid).node
return types.UPID.from_dict(self._post(f'nodes/{node}/qemu/{vmid}/status/stop', node=node))
@ensure_connected
def reset_machine(self, vmid: int, node: typing.Optional[str] = None) -> types.UPID:
node = node or self.get_machine_info(vmid).node
return types.UPID.from_dict(self._post(f'nodes/{node}/qemu/{vmid}/status/reset', node=node))
@ensure_connected
def suspend_machine(self, vmid: int, node: typing.Optional[str] = None) -> types.UPID:
# Note: Suspend, in fact, invoques sets the machine state to "paused"
return self.shutdown_machine(vmid, node)
# node = node or self.get_machine_info(vmid).node
# return types.UPID.from_dict(self._post(f'nodes/{node}/qemu/{vmid}/status/suspend', node=node))
@ensure_connected
def shutdown_machine(self, vmid: int, node: typing.Optional[str] = None) -> types.UPID:
# if exitstatus is "OK" or contains "already running", all is fine
node = node or self.get_machine_info(vmid).node
return types.UPID.from_dict(self._post(f'nodes/{node}/qemu/{vmid}/status/shutdown', node=node))
@ensure_connected
def convert_to_template(self, vmid: int, node: typing.Optional[str] = None) -> None:
node = node or self.get_machine_info(vmid).node
self._post(f'nodes/{node}/qemu/{vmid}/template', node=node)
# Ensure cache is reset for this VM (as it is now a template)
self.get_machine_info(vmid, force=True)
# proxmox has a "resume", but start works for suspended vm so we use it
resume_machine = start_machine
@ensure_connected
@cached('storage', CACHE_DURATION, key_helper=caching_key_helper)
def get_storage(self, storage: str, node: str, **kwargs: typing.Any) -> types.StorageInfo:
return types.StorageInfo.from_dict(
self._get(f'nodes/{node}/storage/{urllib.parse.quote(storage)}/status', node=node)['data']
)
@ensure_connected
@cached('storages', CACHE_DURATION, key_helper=caching_key_helper)
def list_storages(
self,
node: typing.Union[None, str, collections.abc.Iterable[str]] = None,
content: typing.Optional[str] = None,
**kwargs: typing.Any,
) -> list[types.StorageInfo]:
"""We use a list for storage instead of an iterator, so we can cache it..."""
nodes: collections.abc.Iterable[str]
if node is None:
nodes = [n.name for n in self.get_cluster_info().nodes if n.online]
elif isinstance(node, str):
nodes = [node]
else:
nodes = node
params = '' if not content else '?content={}'.format(urllib.parse.quote(content))
result: list[types.StorageInfo] = []
for node_name in nodes:
for storage in self._get(f'nodes/{node_name}/storage{params}', node=node_name)['data']:
storage['node'] = node_name
storage['content'] = storage['content'].split(',')
result.append(types.StorageInfo.from_dict(storage))
return result
@ensure_connected
@cached('nodeStats', CACHE_INFO_DURATION, key_helper=caching_key_helper)
def get_node_stats(self, **kwargs: typing.Any) -> list[types.NodeStats]:
return [
types.NodeStats.from_dict(nodeStat) for nodeStat in self._get('cluster/resources?type=node')['data']
]
@ensure_connected
@cached('pools', CACHE_DURATION // 6, key_helper=caching_key_helper)
def list_pools(self, **kwargs: typing.Any) -> list[types.PoolInfo]:
return [types.PoolInfo.from_dict(poolInfo) for poolInfo in self._get('pools')['data']]
@ensure_connected
@cached('pool', CACHE_DURATION, key_helper=caching_key_helper)
def get_pool_info(
self, pool_id: str, retrieve_vm_names: bool = False, **kwargs: typing.Any
) -> types.PoolInfo:
pool_info = types.PoolInfo.from_dict(self._get(f'pools/{pool_id}')['data'])
if retrieve_vm_names:
for i in range(len(pool_info.members)):
try:
pool_info.members[i] = pool_info.members[i]._replace(
vmname=self.get_machine_info(pool_info.members[i].vmid).name or ''
)
except Exception:
pool_info.members[i] = pool_info.members[i]._replace(
vmname=f'VM-{pool_info.members[i].vmid}'
)
return pool_info
@ensure_connected
def get_console_connection(
self, vmid: int, node: typing.Optional[str] = None
) -> typing.Optional[core_types.services.ConsoleConnectionInfo]:
"""
Gets the connetion info for the specified machine
"""
node = node or self.get_machine_info(vmid).node
res: dict[str, typing.Any] = self._post(f'nodes/{node}/qemu/{vmid}/spiceproxy', node=node)['data']
return core_types.services.ConsoleConnectionInfo(
type=res['type'],
proxy=res['proxy'],
address=res['host'],
port=res.get('port', None),
secure_port=res['tls-port'],
cert_subject=res['host-subject'],
ticket=core_types.services.ConsoleConnectionTicket(value=res['password']),
ca=res.get('ca', None),
)
# Sample data:
# 'data': {'proxy': 'http://pvealone.dkmon.com:3128',
# 'release-cursor': 'Ctrl+Alt+R',
# 'host': 'pvespiceproxy:63489cf9:101:pvealone::c934cf7f7570012bbebab9e1167402b6471aae16',
# 'delete-this-file': 1,
# 'secure-attention': 'Ctrl+Alt+Ins',
# 'title': 'VM 101 - VM-1',
# 'password': '31a189dd71ce859867e28dd68ba166a701e77eed',
# 'type': 'spice',
# 'toggle-fullscreen': 'Shift+F11',
# 'host-subject': 'OU=PVE Cluster Node,O=Proxmox Virtual Environment,CN=pvealone.dkmon.com',
# 'tls-port': 61000,
# 'ca': '-----BEGIN CERTIFICATE-----\\n......\\n-----END CERTIFICATE-----\\n'}}
@ensure_connected
def journal(self, node: str, lastentries: int = 4, **kwargs: typing.Any) -> list[str]:
try:
return self._get(f'nodes/{node}/journal?lastentries={lastentries}')['data']
except Exception:
return []

View File

@ -0,0 +1,777 @@
#
# Copyright (c) 2019-2021 Virtual Cable S.L.U.
# All rights reserved.
#
# Redistribution and use in source and binary forms, with or without modification,
# are permitted provided that the following conditions are met:
#
# * Redistributions of source code must retain the above copyright notice,
# this list of conditions and the following disclaimer.
# * Redistributions in binary form must reproduce the above copyright notice,
# this list of conditions and the following disclaimer in the documentation
# and/or other materials provided with the distribution.
# * Neither the name of Virtual Cable S.L.U. nor the names of its contributors
# may be used to endorse or promote products derived from this software
# without specific prior written permission.
#
# THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS IS"
# AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE
# IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE ARE
# DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT HOLDER OR CONTRIBUTORS BE LIABLE
# FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR CONSEQUENTIAL
# DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR
# SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER
# CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY,
# OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE
# OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
"""
Author: Adolfo Gómez, dkmaster at dkmon dot com
"""
import collections.abc
import re
import time
import typing
import urllib.parse
import logging
from uds.core import types as core_types
from uds.core.util import security
from uds.core.util.cache import Cache
from uds.core.util.decorators import cached, ensure_connected
from . import types, consts, exceptions
import requests
logger = logging.getLogger(__name__)
# caching helper
def caching_key_helper(obj: 'ProxmoxClient') -> str:
return obj._host # pylint: disable=protected-access
class ProxmoxClient:
_host: str
_port: int
_credentials: tuple[tuple[str, str], tuple[str, str]]
_url: str
_validate_cert: bool
_timeout: int
_ticket: str
_csrf: str
cache: typing.Optional['Cache']
def __init__(
self,
host: str,
port: int,
username: str,
password: str,
timeout: int = 5,
validate_certificate: bool = False,
cache: typing.Optional['Cache'] = None,
) -> None:
self._host = host
self._port = port
self._credentials = (('username', username), ('password', password))
self._validate_cert = validate_certificate
self._timeout = timeout
self._url = 'https://{}:{}/api2/json/'.format(self._host, self._port)
self.cache = cache
self._ticket = ''
self._csrf = ''
@property
def headers(self) -> dict[str, str]:
return {
'Accept': 'application/json',
'Content-Type': 'application/x-www-form-urlencoded',
'CSRFPreventionToken': self._csrf,
}
def ensure_correct(self, response: 'requests.Response', *, node: typing.Optional[str]) -> typing.Any:
if not response.ok:
logger.debug('Error on request %s: %s', response.status_code, response.content)
error_message = 'Status code {}'.format(response.status_code)
if response.status_code == 595:
raise exceptions.ProxmoxNodeUnavailableError(response.content.decode('utf8'))
if response.status_code == 403:
raise exceptions.ProxmoxAuthError(response.content.decode('utf8'))
if response.status_code == 400:
try:
error_message = 'Errors on request: {}'.format(response.json()['errors'])
except Exception: # nosec: No joson or no errors, use default msg
pass
if response.status_code == 500 and node:
# Try to get from journal
try:
journal = [x for x in filter(lambda x: 'failed' in x, self.journal(node, 4))]
logger.error('Proxmox error 500:')
for line in journal:
logger.error(' * %s', line)
error_message = f'Error 500 on request: {" ## ".join(journal)}'
except Exception:
pass # If we can't get journal, just use default message
raise exceptions.ProxmoxError(error_message)
return response.json()
def _compose_url_for(self, path: str) -> str:
return self._url + path
def _get(self, path: str, *, node: typing.Optional[str] = None) -> typing.Any:
try:
result = security.secure_requests_session(verify=self._validate_cert).get(
self._compose_url_for(path),
headers=self.headers,
cookies={'PVEAuthCookie': self._ticket},
timeout=self._timeout,
)
logger.debug('GET result to %s: %s -- %s', path, result.status_code, result.content)
except requests.ConnectionError as e:
raise exceptions.ProxmoxConnectionError(str(e))
return self.ensure_correct(result, node=node)
def _post(
self,
path: str,
data: typing.Optional[collections.abc.Iterable[tuple[str, str]]] = None,
*,
node: typing.Optional[str] = None,
) -> typing.Any:
try:
result = security.secure_requests_session(verify=self._validate_cert).post(
self._compose_url_for(path),
data=data, # type: ignore
headers=self.headers,
cookies={'PVEAuthCookie': self._ticket},
timeout=self._timeout,
)
logger.debug('POST result to %s: %s -- %s', path, result.status_code, result.content)
except requests.ConnectionError as e:
raise exceptions.ProxmoxConnectionError(str(e))
return self.ensure_correct(result, node=node)
def _delete(
self,
path: str,
data: typing.Optional[collections.abc.Iterable[tuple[str, str]]] = None,
*,
node: typing.Optional[str] = None,
) -> typing.Any:
try:
result = security.secure_requests_session(verify=self._validate_cert).delete(
self._compose_url_for(path),
data=data, # type: ignore
headers=self.headers,
cookies={'PVEAuthCookie': self._ticket},
timeout=self._timeout,
)
logger.debug(
'DELETE result to %s: %s -- %s -- %s',
path,
result.status_code,
result.content,
result.headers,
)
except requests.ConnectionError as e:
raise exceptions.ProxmoxConnectionError(str(e))
return self.ensure_correct(result, node=node)
def connect(self, force: bool = False) -> None:
if self._ticket:
return # Already connected
# we could cache this for a while, we know that at least for 30 minutes
if self.cache and not force:
dc = self.cache.get(self._host + 'conn')
if dc: # Stored on cache
self._ticket, self._csrf = dc
return
try:
result = security.secure_requests_session(verify=self._validate_cert).post(
url=self._compose_url_for('access/ticket'),
data=self._credentials,
headers=self.headers,
timeout=self._timeout,
)
if not result.ok:
raise exceptions.ProxmoxAuthError(result.content.decode('utf8'))
data = result.json()['data']
self._ticket = data['ticket']
self._csrf = data['CSRFPreventionToken']
if self.cache:
self.cache.put(self._host + 'conn', (self._ticket, self._csrf), validity=1800) # 30 minutes
except requests.RequestException as e:
raise exceptions.ProxmoxConnectionError(str(e)) from e
def test(self) -> bool:
try:
self.connect()
except Exception:
# logger.error('Error testing proxmox: %s', e)
return False
return True
@ensure_connected
@cached('cluster', consts.CACHE_DURATION, key_helper=caching_key_helper)
def get_cluster_info(self, **kwargs: typing.Any) -> types.ClusterInfo:
return types.ClusterInfo.from_dict(self._get('cluster/status'))
@ensure_connected
def get_next_vmid(self) -> int:
return int(self._get('cluster/nextid')['data'])
@ensure_connected
def is_vmid_available(self, vmid: int) -> bool:
try:
self._get(f'cluster/nextid?vmid={vmid}')
except Exception: # Not available
return False
return True
@ensure_connected
@cached('nodeNets', consts.CACHE_DURATION, args=1, kwargs=['node'], key_helper=caching_key_helper)
def get_node_networks(self, node: str, **kwargs: typing.Any) -> typing.Any:
return self._get(f'nodes/{node}/network', node=node)['data']
# pylint: disable=unused-argument
@ensure_connected
@cached('nodeGpuDevices', consts.CACHE_DURATION_LONG, key_helper=caching_key_helper)
def list_node_gpu_devices(self, node: str, **kwargs: typing.Any) -> list[str]:
return [
device['id']
for device in self._get(f'nodes/{node}/hardware/pci', node=node)['data']
if device.get('mdev')
]
@ensure_connected
def list_node_vgpus(self, node: str, **kwargs: typing.Any) -> list[types.VGPUInfo]:
return [
types.VGPUInfo.from_dict(gpu)
for device in self.list_node_gpu_devices(node)
for gpu in self._get(f'nodes/{node}/hardware/pci/{device}/mdev', node=node)['data']
]
@ensure_connected
def node_has_vgpus_available(
self, node: str, vgpu_type: typing.Optional[str], **kwargs: typing.Any
) -> bool:
return any(
gpu.available and (vgpu_type is None or gpu.type == vgpu_type) for gpu in self.list_node_vgpus(node)
)
@ensure_connected
def get_best_node_for_machine(
self,
min_memory: int = 0,
must_have_vgpus: typing.Optional[bool] = None,
mdev_type: typing.Optional[str] = None,
) -> typing.Optional[types.NodeStats]:
'''
Returns the best node to create a VM on
Args:
minMemory (int, optional): Minimum memory required. Defaults to 0.
mustHaveVGPUS (typing.Optional[bool], optional): If the node must have VGPUS. True, False or None (don't care). Defaults to None.
'''
best = types.NodeStats.null()
node: types.NodeStats
# Function to calculate the weight of a node
def calc_weight(x: types.NodeStats) -> float:
return (x.mem / x.maxmem) + (x.cpu / x.maxcpu) * 1.3
# Offline nodes are not "the best"
for node in filter(lambda x: x.status == 'online', self.get_node_stats()):
if min_memory and node.mem < min_memory + 512000000: # 512 MB reserved
continue # Skips nodes with not enouhg memory
if must_have_vgpus is not None and must_have_vgpus != bool(self.list_node_gpu_devices(node.name)):
continue # Skips nodes without VGPUS if vGPUS are required
if mdev_type and not self.node_has_vgpus_available(node.name, mdev_type):
continue # Skips nodes without free vGPUS of required type if a type is required
# Get best node using our simple weight function (basically, the less used node, but with a little more weight on CPU)
if calc_weight(node) < calc_weight(best):
best = node
# logger.debug('Node values for best: %s %f %f', node.name, node.mem / node.maxmem * 100, node.cpu)
return best if best.status == 'online' else None
@ensure_connected
def clone_machine(
self,
vmid: int,
new_vmid: int,
name: str,
description: typing.Optional[str],
as_linked_clone: bool,
use_node: typing.Optional[str] = None,
use_storage: typing.Optional[str] = None,
use_pool: typing.Optional[str] = None,
must_have_vgpus: typing.Optional[bool] = None,
) -> types.VmCreationResult:
vmInfo = self.get_machine_info(vmid)
src_node = vmInfo.node
if not use_node:
logger.debug('Selecting best node')
# If storage is not shared, must be done on same as origin
if use_storage and self.get_storage(use_storage, vmInfo.node).shared:
node = self.get_best_node_for_machine(
min_memory=-1, must_have_vgpus=must_have_vgpus, mdev_type=vmInfo.vgpu_type
)
if node is None:
raise exceptions.ProxmoxError(
f'No switable node available for new vm {name} on Proxmox (check memory and VGPUS, space...)'
)
use_node = node.name
else:
use_node = src_node
# Check if mustHaveVGPUS is compatible with the node
if must_have_vgpus is not None and must_have_vgpus != bool(self.list_node_gpu_devices(use_node)):
raise exceptions.ProxmoxNoGPUError(f'Node "{use_node}" does not have VGPUS and they are required')
if self.node_has_vgpus_available(use_node, vmInfo.vgpu_type):
raise exceptions.ProxmoxNoGPUError(
f'Node "{use_node}" does not have free VGPUS of type {vmInfo.vgpu_type} (requred by VM {vmInfo.name})'
)
# From normal vm, disable "linked cloning"
if as_linked_clone and not vmInfo.template:
as_linked_clone = False
params: list[tuple[str, str]] = [
('newid', str(new_vmid)),
('name', name),
('target', use_node),
('full', str(int(not as_linked_clone))),
]
if description:
params.append(('description', description))
if use_storage and as_linked_clone is False:
params.append(('storage', use_storage))
if use_pool:
params.append(('pool', use_pool))
if as_linked_clone is False:
params.append(('format', 'qcow2')) # Ensure clone for templates is on qcow2 format
logger.debug('PARAMS: %s', params)
return types.VmCreationResult(
node=use_node,
vmid=new_vmid,
upid=types.UPID.from_dict(
self._post(f'nodes/{src_node}/qemu/{vmid}/clone', data=params, node=src_node)
),
)
@ensure_connected
@cached('hagrps', consts.CACHE_DURATION, key_helper=caching_key_helper)
def list_ha_groups(self, **kwargs: typing.Any) -> list[str]:
return [g['group'] for g in self._get('cluster/ha/groups')['data']]
@ensure_connected
def enable_machine_ha(self, vmid: int, started: bool = False, group: typing.Optional[str] = None) -> None:
"""
Enable high availability for a virtual machine.
Args:
vmid (int): The ID of the virtual machine.
started (bool, optional): Whether the virtual machine should be started. Defaults to False.
group (str, optional): The group to which the virtual machine belongs. Defaults to None.
"""
self._post(
'cluster/ha/resources',
data=[
('sid', f'vm:{vmid}'),
('comment', 'UDS HA VM'),
('state', 'started' if started else 'stopped'),
('max_restart', '4'),
('max_relocate', '4'),
]
+ ([('group', group)] if group else []), # Append ha group if present
)
@ensure_connected
def disable_machine_ha(self, vmid: int) -> None:
try:
self._delete(f'cluster/ha/resources/vm%3A{vmid}')
except Exception:
logger.exception('removeFromHA')
@ensure_connected
def set_protection(self, vmid: int, node: typing.Optional[str] = None, protection: bool = False) -> None:
params: list[tuple[str, str]] = [
('protection', str(int(protection))),
]
node = node or self.get_machine_info(vmid).node
self._post(f'nodes/{node}/qemu/{vmid}/config', data=params, node=node)
@ensure_connected
def get_guest_ip_address(
self, vmid: int, node: typing.Optional[str], ip_version: typing.Literal['4', '6', ''] = ''
) -> str:
"""Returns the guest ip address of the specified machine"""
try:
node = node or self.get_machine_info(vmid).node
ifaces_list: list[dict[str, typing.Any]] = self._get(
f'nodes/{node}/qemu/{vmid}/agent/network-get-interfaces',
node=node,
)['data']['result']
# look for first non-localhost interface with an ip address
for iface in ifaces_list:
if iface['name'] != 'lo' and 'ip-addresses' in iface:
for ip in iface['ip-addresses']:
if ip['ip-address'].startswith('127.'):
continue
if ip_version == '4' and ip.get('ip-address-type') != 'ipv4':
continue
elif ip_version == '6' and ip.get('ip-address-type') != 'ipv6':
continue
return ip['ip-address']
except Exception as e:
logger.info('Error getting guest ip address for machine %s: %s', vmid, e)
raise exceptions.ProxmoxError(f'No ip address for vm {vmid}: {e}')
raise exceptions.ProxmoxError('No ip address found for vm {}'.format(vmid))
@ensure_connected
def remove_machine(self, vmid: int, node: typing.Optional[str] = None, purge: bool = True) -> types.UPID:
node = node or self.get_machine_info(vmid).node
return types.UPID.from_dict(self._delete(f'nodes/{node}/qemu/{vmid}?purge=1', node=node))
@ensure_connected
def list_snapshots(self, vmid: int, node: typing.Optional[str] = None) -> list[types.SnapshotInfo]:
node = node or self.get_machine_info(vmid).node
try:
return [
types.SnapshotInfo.from_dict(s)
for s in self._get(f'nodes/{node}/qemu/{vmid}/snapshot', node=node)['data']
]
except Exception:
return [] # If we can't get snapshots, just return empty list
@ensure_connected
@cached('snapshots', consts.CACHE_DURATION, key_helper=caching_key_helper)
def supports_snapshot(self, vmid: int, node: typing.Optional[str] = None) -> bool:
# If machine uses tpm, snapshots are not supported
return not self.get_machine_configuration(vmid, node).tpmstate0
@ensure_connected
def create_snapshot(
self,
vmid: int,
node: 'str|None' = None,
name: typing.Optional[str] = None,
description: typing.Optional[str] = None,
) -> types.UPID:
if self.supports_snapshot(vmid, node) is False:
raise exceptions.ProxmoxError('Machine does not support snapshots')
node = node or self.get_machine_info(vmid).node
# Compose a sanitized name, without spaces and with a timestamp
name = name or f'UDS-{time.time()}'
params: list[tuple[str, str]] = [
('snapname', name),
('description', description or f'UDS Snapshot created at {time.strftime("%c")}'),
]
params.append(('snapname', name or ''))
return types.UPID.from_dict(self._post(f'nodes/{node}/qemu/{vmid}/snapshot', data=params, node=node))
@ensure_connected
def remove_snapshot(
self, vmid: int, node: 'str|None' = None, name: typing.Optional[str] = None
) -> types.UPID:
node = node or self.get_machine_info(vmid).node
if name is None:
raise exceptions.ProxmoxError('Snapshot name is required')
return types.UPID.from_dict(self._delete(f'nodes/{node}/qemu/{vmid}/snapshot/{name}', node=node))
@ensure_connected
def restore_snapshot(
self, vmid: int, node: 'str|None' = None, name: typing.Optional[str] = None
) -> types.UPID:
node = node or self.get_machine_info(vmid).node
if name is None:
raise exceptions.ProxmoxError('Snapshot name is required')
return types.UPID.from_dict(self._post(f'nodes/{node}/qemu/{vmid}/snapshot/{name}/rollback', node=node))
@ensure_connected
def get_task(self, node: str, upid: str) -> types.TaskStatus:
return types.TaskStatus.from_dict(
self._get(f'nodes/{node}/tasks/{urllib.parse.quote(upid)}/status', node=node)
)
@cached('vms', consts.CACHE_DURATION, key_helper=caching_key_helper)
@ensure_connected
def list_machines(
self, node: typing.Union[None, str, collections.abc.Iterable[str]] = None, **kwargs: typing.Any
) -> list[types.VMInfo]:
node_list: collections.abc.Iterable[str]
if node is None:
node_list = [n.name for n in self.get_cluster_info().nodes if n.online]
elif isinstance(node, str):
node_list = [node]
else:
node_list = node
result: list[types.VMInfo] = []
for node_name in node_list:
for vm in self._get(f'nodes/{node_name}/qemu', node=node_name)['data']:
vm['node'] = node_name
result.append(types.VMInfo.from_dict(vm))
return sorted(result, key=lambda x: '{}{}'.format(x.node, x.name))
@cached('vmip', consts.CACHE_INFO_DURATION, key_helper=caching_key_helper)
@ensure_connected
def get_machine_pool_info(
self, vmid: int, poolid: typing.Optional[str], **kwargs: typing.Any
) -> types.VMInfo:
# try to locate machine in pool
node = None
if poolid:
try:
for i in self._get(f'pools/{poolid}', node=node)['data']['members']:
try:
if i['vmid'] == vmid:
node = i['node']
break
except Exception: # nosec: # If vmid is not present, just try next node
pass
except Exception: # nosec: # If pool is not present, just use default getVmInfo
pass
return self.get_machine_info(vmid, node, **kwargs)
@ensure_connected
@cached('vmin', consts.CACHE_INFO_DURATION, key_helper=caching_key_helper)
def get_machine_info(
self, vmid: int, node: typing.Optional[str] = None, **kwargs: typing.Any
) -> types.VMInfo:
nodes = [types.Node(node, False, False, 0, '', '', '')] if node else self.get_cluster_info().nodes
any_node_is_down = False
for n in nodes:
try:
vm = self._get(f'nodes/{n.name}/qemu/{vmid}/status/current', node=node)['data']
vm['node'] = n.name
return types.VMInfo.from_dict(vm)
except exceptions.ProxmoxConnectionError:
any_node_is_down = True # There is at least one node down when we are trying to get info
except exceptions.ProxmoxAuthError:
raise
except exceptions.ProxmoxError:
pass # Any other error, ignore this node (not found in that node)
if any_node_is_down:
raise exceptions.ProxmoxNodeUnavailableError('All nodes are down or not available')
raise exceptions.ProxmoxNotFound(f'VM {vmid} not found')
@ensure_connected
def get_machine_configuration(
self, vmid: int, node: typing.Optional[str] = None, **kwargs: typing.Any
) -> types.VMConfiguration:
node = node or self.get_machine_info(vmid).node
return types.VMConfiguration.from_dict(self._get(f'nodes/{node}/qemu/{vmid}/config', node=node)['data'])
@ensure_connected
def set_machine_mac(
self,
vmid: int,
mac: str,
netid: typing.Optional[str] = None,
node: typing.Optional[str] = None,
) -> None:
node = node or self.get_machine_info(vmid).node
# First, read current configuration and extract network configuration
config = self._get(f'nodes/{node}/qemu/{vmid}/config', node=node)['data']
if netid not in config:
# Get first network interface (netX where X is a number)
netid = next((k for k in config if k.startswith('net') and k[3:].isdigit()), None)
if not netid:
raise exceptions.ProxmoxError('No network interface found')
netdata = config[netid]
# Update mac address, that is the first field <model>=<mac>,<other options>
netdata = re.sub(r'^([^=]+)=([^,]+),', r'\1={},'.format(mac), netdata)
logger.debug('Updating mac address for VM %s: %s=%s', vmid, netid, netdata)
self._post(
f'nodes/{node}/qemu/{vmid}/config',
data=[(netid, netdata)],
node=node,
)
@ensure_connected
def start_machine(self, vmid: int, node: typing.Optional[str] = None) -> types.UPID:
# if exitstatus is "OK" or contains "already running", all is fine
node = node or self.get_machine_info(vmid).node
return types.UPID.from_dict(self._post(f'nodes/{node}/qemu/{vmid}/status/start', node=node))
@ensure_connected
def stop_machine(self, vmid: int, node: typing.Optional[str] = None) -> types.UPID:
node = node or self.get_machine_info(vmid).node
return types.UPID.from_dict(self._post(f'nodes/{node}/qemu/{vmid}/status/stop', node=node))
@ensure_connected
def reset_machine(self, vmid: int, node: typing.Optional[str] = None) -> types.UPID:
node = node or self.get_machine_info(vmid).node
return types.UPID.from_dict(self._post(f'nodes/{node}/qemu/{vmid}/status/reset', node=node))
@ensure_connected
def suspend_machine(self, vmid: int, node: typing.Optional[str] = None) -> types.UPID:
# Note: Suspend, in fact, invoques sets the machine state to "paused"
return self.shutdown_machine(vmid, node)
# node = node or self.get_machine_info(vmid).node
# return types.UPID.from_dict(self._post(f'nodes/{node}/qemu/{vmid}/status/suspend', node=node))
@ensure_connected
def shutdown_machine(self, vmid: int, node: typing.Optional[str] = None) -> types.UPID:
# if exitstatus is "OK" or contains "already running", all is fine
node = node or self.get_machine_info(vmid).node
return types.UPID.from_dict(self._post(f'nodes/{node}/qemu/{vmid}/status/shutdown', node=node))
@ensure_connected
def convert_to_template(self, vmid: int, node: typing.Optional[str] = None) -> None:
node = node or self.get_machine_info(vmid).node
self._post(f'nodes/{node}/qemu/{vmid}/template', node=node)
# Ensure cache is reset for this VM (as it is now a template)
self.get_machine_info(vmid, force=True)
# proxmox has a "resume", but start works for suspended vm so we use it
resume_machine = start_machine
@ensure_connected
@cached('storage', consts.CACHE_DURATION, key_helper=caching_key_helper)
def get_storage(self, storage: str, node: str, **kwargs: typing.Any) -> types.StorageInfo:
return types.StorageInfo.from_dict(
self._get(f'nodes/{node}/storage/{urllib.parse.quote(storage)}/status', node=node)['data']
)
@ensure_connected
@cached('storages', consts.CACHE_DURATION, key_helper=caching_key_helper)
def list_storages(
self,
node: typing.Union[None, str, collections.abc.Iterable[str]] = None,
content: typing.Optional[str] = None,
**kwargs: typing.Any,
) -> list[types.StorageInfo]:
"""We use a list for storage instead of an iterator, so we can cache it..."""
nodes: collections.abc.Iterable[str]
if node is None:
nodes = [n.name for n in self.get_cluster_info().nodes if n.online]
elif isinstance(node, str):
nodes = [node]
else:
nodes = node
params = '' if not content else '?content={}'.format(urllib.parse.quote(content))
result: list[types.StorageInfo] = []
for node_name in nodes:
for storage in self._get(f'nodes/{node_name}/storage{params}', node=node_name)['data']:
storage['node'] = node_name
storage['content'] = storage['content'].split(',')
result.append(types.StorageInfo.from_dict(storage))
return result
@ensure_connected
@cached('nodeStats', consts.CACHE_INFO_DURATION, key_helper=caching_key_helper)
def get_node_stats(self, **kwargs: typing.Any) -> list[types.NodeStats]:
return [
types.NodeStats.from_dict(nodeStat) for nodeStat in self._get('cluster/resources?type=node')['data']
]
@ensure_connected
@cached('pools', consts.CACHE_DURATION // 6, key_helper=caching_key_helper)
def list_pools(self, **kwargs: typing.Any) -> list[types.PoolInfo]:
return [types.PoolInfo.from_dict(poolInfo) for poolInfo in self._get('pools')['data']]
@ensure_connected
@cached('pool', consts.CACHE_DURATION, key_helper=caching_key_helper)
def get_pool_info(
self, pool_id: str, retrieve_vm_names: bool = False, **kwargs: typing.Any
) -> types.PoolInfo:
pool_info = types.PoolInfo.from_dict(self._get(f'pools/{pool_id}')['data'])
if retrieve_vm_names:
for i in range(len(pool_info.members)):
try:
pool_info.members[i] = pool_info.members[i]._replace(
vmname=self.get_machine_info(pool_info.members[i].vmid).name or ''
)
except Exception:
pool_info.members[i] = pool_info.members[i]._replace(
vmname=f'VM-{pool_info.members[i].vmid}'
)
return pool_info
@ensure_connected
def get_console_connection(
self, vmid: int, node: typing.Optional[str] = None
) -> typing.Optional[core_types.services.ConsoleConnectionInfo]:
"""
Gets the connetion info for the specified machine
"""
node = node or self.get_machine_info(vmid).node
res: dict[str, typing.Any] = self._post(f'nodes/{node}/qemu/{vmid}/spiceproxy', node=node)['data']
return core_types.services.ConsoleConnectionInfo(
type=res['type'],
proxy=res['proxy'],
address=res['host'],
port=res.get('port', None),
secure_port=res['tls-port'],
cert_subject=res['host-subject'],
ticket=core_types.services.ConsoleConnectionTicket(value=res['password']),
ca=res.get('ca', None),
)
# Sample data:
# 'data': {'proxy': 'http://pvealone.dkmon.com:3128',
# 'release-cursor': 'Ctrl+Alt+R',
# 'host': 'pvespiceproxy:63489cf9:101:pvealone::c934cf7f7570012bbebab9e1167402b6471aae16',
# 'delete-this-file': 1,
# 'secure-attention': 'Ctrl+Alt+Ins',
# 'title': 'VM 101 - VM-1',
# 'password': '31a189dd71ce859867e28dd68ba166a701e77eed',
# 'type': 'spice',
# 'toggle-fullscreen': 'Shift+F11',
# 'host-subject': 'OU=PVE Cluster Node,O=Proxmox Virtual Environment,CN=pvealone.dkmon.com',
# 'tls-port': 61000,
# 'ca': '-----BEGIN CERTIFICATE-----\\n......\\n-----END CERTIFICATE-----\\n'}}
@ensure_connected
def journal(self, node: str, lastentries: int = 4, **kwargs: typing.Any) -> list[str]:
try:
return self._get(f'nodes/{node}/journal?lastentries={lastentries}')['data']
except Exception:
return []

View File

@ -0,0 +1,42 @@
#
# Copyright (c) 2012-2023 Virtual Cable S.L.U.
# All rights reserved.
#
# Redistribution and use in source and binary forms, with or without modification,
# are permitted provided that the following conditions are met:
#
# * Redistributions of source code must retain the above copyright notice,
# this list of conditions and the following disclaimer.
# * Redistributions in binary form must reproduce the above copyright notice,
# this list of conditions and the following disclaimer in the documentation
# and/or other materials provided with the distribution.
# * Neither the name of Virtual Cable S.L.U. nor the names of its contributors
# may be used to endorse or promote products derived from this software
# without specific prior written permission.
#
# THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS IS"
# AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE
# IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE ARE
# DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT HOLDER OR CONTRIBUTORS BE LIABLE
# FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR CONSEQUENTIAL
# DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR
# SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER
# CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY,
# OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE
# OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE."""
"""
Author: Adolfo Gómez, dkmaster at dkmon dot com
"""
# DEFAULT_PORT = 8006
from uds.core import consts
import typing
CACHE_DURATION: typing.Final[int] = consts.cache.DEFAULT_CACHE_TIMEOUT
CACHE_INFO_DURATION: typing.Final[int] = consts.cache.SHORT_CACHE_TIMEOUT
# Cache duration is 3 minutes, so this is 60 mins * 24 = 1 day (default)
CACHE_DURATION_LONG: typing.Final[int] = consts.cache.EXTREME_CACHE_TIMEOUT

View File

@ -0,0 +1,56 @@
#
# Copyright (c) 2012-2023 Virtual Cable S.L.U.
# All rights reserved.
#
# Redistribution and use in source and binary forms, with or without modification,
# are permitted provided that the following conditions are met:
#
# * Redistributions of source code must retain the above copyright notice,
# this list of conditions and the following disclaimer.
# * Redistributions in binary form must reproduce the above copyright notice,
# this list of conditions and the following disclaimer in the documentation
# and/or other materials provided with the distribution.
# * Neither the name of Virtual Cable S.L.U. nor the names of its contributors
# may be used to endorse or promote products derived from this software
# without specific prior written permission.
#
# THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS IS"
# AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE
# IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE ARE
# DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT HOLDER OR CONTRIBUTORS BE LIABLE
# FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR CONSEQUENTIAL
# DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR
# SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER
# CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY,
# OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE
# OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE."""
"""
Author: Adolfo Gómez, dkmaster at dkmon dot com
"""
from uds.core.services.generics import exceptions
class ProxmoxError(exceptions.Error):
pass
class ProxmoxConnectionError(exceptions.RetryableError):
pass
class ProxmoxAuthError(exceptions.FatalError):
pass
class ProxmoxNotFound(exceptions.NotFoundError):
pass
class ProxmoxNodeUnavailableError(ProxmoxConnectionError):
pass
class ProxmoxNoGPUError(ProxmoxError):
pass

View File

@ -30,9 +30,9 @@ def _from_dict(
extra = extra or {}
return type(
**{
k: typing.cast(typing.Callable[..., typing.Any], CONVERSORS.get(type.__annotations__.get(k, str), lambda x: x))(
dictionary.get(k, extra.get(k, None))
)
k: typing.cast(
typing.Callable[..., typing.Any], CONVERSORS.get(type.__annotations__.get(k, str), lambda x: x)
)(dictionary.get(k, extra.get(k, None)))
for k in type._fields # pyright: ignore # _fields is a NamedTuple attribute that contains fields
}
)

View File

@ -46,7 +46,7 @@ from .deployment_fixed import ProxmoxUserServiceFixed
if typing.TYPE_CHECKING:
from uds import models
from . import proxmox
from .proxmox import types as prox_types
from .provider import ProxmoxProvider
logger = logging.getLogger(__name__)
@ -113,7 +113,7 @@ class ProxmoxServiceFixed(FixedService): # pylint: disable=too-many-public-meth
def provider(self) -> 'ProxmoxProvider':
return typing.cast('ProxmoxProvider', super().provider())
def get_machine_info(self, vmId: int) -> 'proxmox.types.VMInfo':
def get_machine_info(self, vmId: int) -> 'prox_types.VMInfo':
return self.provider().get_machine_info(vmId, self.pool.value.strip())
def is_avaliable(self) -> bool:

View File

@ -47,7 +47,7 @@ from .publication import ProxmoxPublication
# Not imported at runtime, just for type checking
if typing.TYPE_CHECKING:
from . import proxmox
from proxmox import types as prox_types
from .provider import ProxmoxProvider
from uds.core.services.generics.dynamic.publication import DynamicPublication
from uds.core.services.generics.dynamic.service import DynamicService
@ -207,7 +207,7 @@ class ProxmoxServiceLinked(DynamicService):
"""
return re.sub("[^a-zA-Z0-9_-]", "-", name)
def clone_machine(self, name: str, description: str, vmid: int = -1) -> 'proxmox.types.VmCreationResult':
def clone_machine(self, name: str, description: str, vmid: int = -1) -> 'prox_types.VmCreationResult':
name = self.sanitized_name(name)
pool = self.pool.value or None
if vmid == -1: # vmId == -1 if cloning for template
@ -230,14 +230,14 @@ class ProxmoxServiceLinked(DynamicService):
must_have_vgpus={'1': True, '2': False}.get(self.gpu.value, None),
)
def get_machine_info(self, vmid: int) -> 'proxmox.types.VMInfo':
def get_machine_info(self, vmid: int) -> 'prox_types.VMInfo':
return self.provider().get_machine_info(vmid, self.pool.value.strip())
def get_nic_mac(self, vmid: int) -> str:
config = self.provider().get_machine_configuration(vmid)
return config.networks[0].mac.lower()
def xremove_machine(self, vmid: int) -> 'proxmox.types.UPID':
def xremove_machine(self, vmid: int) -> 'prox_types.UPID':
# First, remove from HA if needed
try:
self.disable_machine_ha(vmid)