Improved Proxmox query efficiency for machines in a pookl

This commit is contained in:
Adolfo Gómez García 2021-06-25 13:21:09 +02:00
parent 52ac406853
commit 655a6447ba
3 changed files with 187 additions and 81 deletions

View File

@ -44,6 +44,7 @@ from uds.core.util.decorators import allowCache, ensureConected
# DEFAULT_PORT = 8006
CACHE_DURATION = 120 # Keep cache 2 minutes by default
CACHE_INFO_DURATION = 30
# Not imported at runtime, just for type checking
if typing.TYPE_CHECKING:
@ -68,13 +69,16 @@ class ProxmoxAuthError(ProxmoxError):
class ProxmoxNotFound(ProxmoxError):
pass
class ProxmoxNodeUnavailableError(ProxmoxConnectionError):
pass
# caching helper
def cachingKeyHelper(obj: 'ProxmoxClient') -> str:
return obj._host # pylint: disable=protected-access
class ProxmoxClient:
_host: str
_port: int
@ -89,15 +93,15 @@ class ProxmoxClient:
cache: typing.Optional['Cache']
def __init__(
self,
host: str,
port: int,
username: str,
password: str,
timeout: int = 5,
validateCertificate: bool = False,
cache: typing.Optional['Cache'] = None
) -> None:
self,
host: str,
port: int,
username: str,
password: str,
timeout: int = 5,
validateCertificate: bool = False,
cache: typing.Optional['Cache'] = None,
) -> None:
self._host = host
self._port = port
self._credentials = (('username', username), ('password', password))
@ -110,7 +114,7 @@ class ProxmoxClient:
self._ticket = ''
self._csrf = ''
# Disable warnings from urllib for
# Disable warnings from urllib for
urllib3.disable_warnings(urllib3.exceptions.InsecureRequestWarning)
@property
@ -118,7 +122,7 @@ class ProxmoxClient:
return {
'Accept': 'application/json',
'Content-Type': 'application/x-www-form-urlencoded',
'CSRFPreventionToken': self._csrf
'CSRFPreventionToken': self._csrf,
}
@staticmethod
@ -150,38 +154,52 @@ class ProxmoxClient:
headers=self.headers,
cookies={'PVEAuthCookie': self._ticket},
verify=self._validateCert,
timeout=self._timeout
timeout=self._timeout,
)
logger.debug('GET result to %s: %s -- %s', path, result.status_code, result.content)
logger.debug(
'GET result to %s: %s -- %s', path, result.status_code, result.content
)
return ProxmoxClient.checkError(result)
def _post(self, path: str, data: typing.Optional[typing.Iterable[typing.Tuple[str, str]]] = None) -> typing.Any:
def _post(
self,
path: str,
data: typing.Optional[typing.Iterable[typing.Tuple[str, str]]] = None,
) -> typing.Any:
result = requests.post(
self._getPath(path),
data=data,
headers=self.headers,
cookies={'PVEAuthCookie': self._ticket},
verify=self._validateCert,
timeout=self._timeout
timeout=self._timeout,
)
logger.debug('POST result to %s: %s -- %s', path, result.status_code, result.content)
logger.debug(
'POST result to %s: %s -- %s', path, result.status_code, result.content
)
return ProxmoxClient.checkError(result)
def _delete(self, path: str, data: typing.Optional[typing.Iterable[typing.Tuple[str, str]]] = None) -> typing.Any:
def _delete(
self,
path: str,
data: typing.Optional[typing.Iterable[typing.Tuple[str, str]]] = None,
) -> typing.Any:
result = requests.delete(
self._getPath(path),
data=data,
headers=self.headers,
cookies={'PVEAuthCookie': self._ticket},
verify=self._validateCert,
timeout=self._timeout
timeout=self._timeout,
)
logger.debug('DELETE result to %s: %s -- %s', path, result.status_code, result.content)
logger.debug(
'DELETE result to %s: %s -- %s', path, result.status_code, result.content
)
return ProxmoxClient.checkError(result)
@ -202,7 +220,7 @@ class ProxmoxClient:
data=self._credentials,
headers=self.headers,
verify=self._validateCert,
timeout=self._timeout
timeout=self._timeout,
)
if not result.ok:
raise ProxmoxAuthError()
@ -211,7 +229,9 @@ class ProxmoxClient:
self._csrf = data['CSRFPreventionToken']
if self.cache:
self.cache.put(self._host + 'conn', (self._ticket, self._csrf), validity=1800) # 30 minutes
self.cache.put(
self._host + 'conn', (self._ticket, self._csrf), validity=1800
) # 30 minutes
except requests.RequestException as e:
raise ProxmoxConnectionError from e
@ -233,7 +253,13 @@ class ProxmoxClient:
return int(self._get('cluster/nextid')['data'])
@ensureConected
@allowCache('nodeNets', CACHE_DURATION, cachingArgs=1, cachingKWArgs=['node'], cachingKeyFnc=cachingKeyHelper)
@allowCache(
'nodeNets',
CACHE_DURATION,
cachingArgs=1,
cachingKWArgs=['node'],
cachingKeyFnc=cachingKeyHelper,
)
def getNodeNetworks(self, node: str, **kwargs):
return self._get('nodes/{}/network'.format(node))['data']
@ -241,15 +267,13 @@ class ProxmoxClient:
def getBestNodeForVm(self, minMemory: int = 0) -> typing.Optional[types.NodeStats]:
best = types.NodeStats.empty()
node: types.NodeStats
weightFnc = lambda x: (x.mem / x .maxmem) + (x.cpu / x.maxcpu) * 1.3
for node in self.getNodesStats():
if node.status != 'online': # Offline nodes are not "the best"
continue
weightFnc = lambda x: (x.mem / x.maxmem) + (x.cpu / x.maxcpu) * 1.3
# Offline nodes are not "the best"
for node in filter(lambda x: x.status == 'online', self.getNodesStats()):
if minMemory and node.mem < minMemory + 512000000: # 512 MB reserved
continue # Skips nodes with not enouhg memory
if weightFnc(node) < weightFnc(best):
best = node
@ -266,7 +290,7 @@ class ProxmoxClient:
linkedClone: bool,
toNode: typing.Optional[str] = None,
toStorage: typing.Optional[str] = None,
toPool: typing.Optional[str] = None
toPool: typing.Optional[str] = None,
) -> types.VmCreationResult:
newVmId = self.getNextVMId()
vmInfo = self.getVmInfo(vmId)
@ -279,7 +303,11 @@ class ProxmoxClient:
if toStorage and self.getStorage(toStorage, vmInfo.node).shared:
node = self.getBestNodeForVm(minMemory=-1)
if node is None:
raise ProxmoxError('No switable node available for new vm {} on Proxmox'.format(name))
raise ProxmoxError(
'No switable node available for new vm {} on Proxmox'.format(
name
)
)
toNode = node.name
else:
toNode = fromNode
@ -305,7 +333,9 @@ class ProxmoxClient:
params.append(('pool', toPool))
if linkedClone is False:
params.append(('format', 'qcow2')) # Ensure clone for templates is on qcow2 format
params.append(
('format', 'qcow2')
) # Ensure clone for templates is on qcow2 format
logger.debug('PARAMS: %s', params)
@ -313,11 +343,8 @@ class ProxmoxClient:
node=toNode,
vmid=newVmId,
upid=types.UPID.fromDict(
self._post(
'nodes/{}/qemu/{}/clone'.format(fromNode, vmId),
data=params
)
)
self._post('nodes/{}/qemu/{}/clone'.format(fromNode, vmId), data=params)
),
)
@ensureConected
@ -326,16 +353,19 @@ class ProxmoxClient:
return [g['group'] for g in self._get('cluster/ha/groups')['data']]
@ensureConected
def enableVmHA(self, vmId: int, started: bool = False, group: typing.Optional[str] = None) -> None:
def enableVmHA(
self, vmId: int, started: bool = False, group: typing.Optional[str] = None
) -> None:
self._post(
'cluster/ha/resources',
data=[
('sid', 'vm:{}'.format(vmId)),
('comment', 'UDS HA VM'),
('state', 'started' if started else 'stopped'),
('state', 'started' if started else 'stopped'),
('max_restart', '4'),
('max_relocate', '4')
] + ([('group', group)] if group else [])
('max_relocate', '4'),
]
+ ([('group', group)] if group else []),
)
@ensureConected
@ -346,32 +376,39 @@ class ProxmoxClient:
logger.exception('removeFromHA')
@ensureConected
def setProtection(self, vmId: int, node: typing.Optional[str] = None, protection: bool=False) -> None:
def setProtection(
self, vmId: int, node: typing.Optional[str] = None, protection: bool = False
) -> None:
params: typing.List[typing.Tuple[str, str]] = [
('protection', str(int(protection))),
]
node = node or self.getVmInfo(vmId).node
self._post(
'nodes/{}/qemu/{}/config'.format(node, vmId),
data=params
)
self._post('nodes/{}/qemu/{}/config'.format(node, vmId), data=params)
@ensureConected
def deleteVm(self, vmId: int, node: typing.Optional[str] = None, purge: bool = True) -> types.UPID:
def deleteVm(
self, vmId: int, node: typing.Optional[str] = None, purge: bool = True
) -> types.UPID:
node = node or self.getVmInfo(vmId).node
return types.UPID.fromDict(
self._delete(
'nodes/{}/qemu/{}'.format(node, vmId)
)
)
return types.UPID.fromDict(self._delete('nodes/{}/qemu/{}'.format(node, vmId)))
@ensureConected
def getTask(self, node: str, upid: str) -> types.TaskStatus:
return types.TaskStatus.fromJson(self._get('nodes/{}/tasks/{}/status'.format(node, urllib.parse.quote(upid))))
return types.TaskStatus.fromJson(
self._get('nodes/{}/tasks/{}/status'.format(node, urllib.parse.quote(upid)))
)
@ensureConected
@allowCache('vms', CACHE_DURATION, cachingArgs=1, cachingKWArgs='node', cachingKeyFnc=cachingKeyHelper)
def listVms(self, node: typing.Union[None, str, typing.Iterable[str]] = None) -> typing.List[types.VMInfo]:
@allowCache(
'vms',
CACHE_DURATION,
cachingArgs=1,
cachingKWArgs='node',
cachingKeyFnc=cachingKeyHelper,
)
def listVms(
self, node: typing.Union[None, str, typing.Iterable[str]] = None
) -> typing.List[types.VMInfo]:
nodeList: typing.Iterable[str]
if node is None:
nodeList = [n.name for n in self.getClusterInfo().nodes if n.online]
@ -389,13 +426,40 @@ class ProxmoxClient:
return sorted(result, key=lambda x: '{}{}'.format(x.node, x.name))
@ensureConected
# @allowCache('vmi', CACHE_DURATION, cachingArgs=[1, 2], cachingKWArgs=['vmId', 'node'], cachingKeyFnc=cachingKeyHelper)
def getVmInfo(self, vmId: int, node: typing.Optional[str] = None, **kwargs) -> types.VMInfo:
nodes = [types.Node(node, False, False, 0, '', '', '')] if node else self.getClusterInfo().nodes
@allowCache('vmip', CACHE_INFO_DURATION, cachingArgs=[1, 2], cachingKWArgs=['vmId', 'poolId'], cachingKeyFnc=cachingKeyHelper)
def getVMPoolInfo(self, vmId: int, poolId: str, **kwargs) -> types.VMInfo:
# try to locate machine in pool
node = None
if poolId:
try:
for i in self._get(f'pools/{poolId}')['data']['members']:
try:
if i['vmid'] == vmId:
node = i['node']
break
except Exception:
pass
except Exception: # Error requesting pool, fallback to getVmInfo
pass
return self.getVmInfo(vmId, node)
@ensureConected
@allowCache('vmin', CACHE_INFO_DURATION, cachingArgs=[1, 2], cachingKWArgs=['vmId', 'node'], cachingKeyFnc=cachingKeyHelper)
def getVmInfo(
self, vmId: int, node: typing.Optional[str] = None, **kwargs
) -> types.VMInfo:
nodes = (
[types.Node(node, False, False, 0, '', '', '')]
if node
else self.getClusterInfo().nodes
)
anyNodeIsDown = False
for n in nodes:
try:
vm = self._get('nodes/{}/qemu/{}/status/current'.format(n.name, vmId))['data']
vm = self._get('nodes/{}/qemu/{}/status/current'.format(n.name, vmId))[
'data'
]
vm['node'] = n.name
return types.VMInfo.fromDict(vm)
except ProxmoxConnectionError:
@ -410,40 +474,53 @@ class ProxmoxClient:
raise ProxmoxNotFound()
@ensureConected
# @allowCache('vmc', CACHE_DURATION, cachingArgs=[1, 2], cachingKWArgs=['vmId', 'node'], cachingKeyFnc=cachingKeyHelper)
def getVmConfiguration(self, vmId: int, node: typing.Optional[str] = None, **kwargs):
def getVmConfiguration(
self, vmId: int, node: typing.Optional[str] = None, **kwargs
):
node = node or self.getVmInfo(vmId).node
return types.VMConfiguration.fromDict(self._get('nodes/{}/qemu/{}/config'.format(node, vmId))['data'])
return types.VMConfiguration.fromDict(
self._get('nodes/{}/qemu/{}/config'.format(node, vmId))['data']
)
@ensureConected
def startVm(self, vmId: int, node: typing.Optional[str] = None) -> types.UPID:
# if exitstatus is "OK" or contains "already running", all is fine
node = node or self.getVmInfo(vmId).node
return types.UPID.fromDict(self._post('nodes/{}/qemu/{}/status/start'.format(node, vmId)))
return types.UPID.fromDict(
self._post('nodes/{}/qemu/{}/status/start'.format(node, vmId))
)
@ensureConected
def stopVm(self, vmId: int, node: typing.Optional[str] = None) -> types.UPID:
node = node or self.getVmInfo(vmId).node
return types.UPID.fromDict(self._post('nodes/{}/qemu/{}/status/stop'.format(node, vmId)))
return types.UPID.fromDict(
self._post('nodes/{}/qemu/{}/status/stop'.format(node, vmId))
)
@ensureConected
def resetVm(self, vmId: int, node: typing.Optional[str] = None) -> types.UPID:
node = node or self.getVmInfo(vmId).node
return types.UPID.fromDict(self._post('nodes/{}/qemu/{}/status/reset'.format(node, vmId)))
return types.UPID.fromDict(
self._post('nodes/{}/qemu/{}/status/reset'.format(node, vmId))
)
@ensureConected
def suspendVm(self, vmId: int, node: typing.Optional[str] = None) -> types.UPID:
# if exitstatus is "OK" or contains "already running", all is fine
node = node or self.getVmInfo(vmId).node
return types.UPID.fromDict(self._post('nodes/{}/qemu/{}/status/suspend'.format(node, vmId)))
return types.UPID.fromDict(
self._post('nodes/{}/qemu/{}/status/suspend'.format(node, vmId))
)
@ensureConected
def shutdownVm(self, vmId: int, node: typing.Optional[str] = None) -> types.UPID:
# if exitstatus is "OK" or contains "already running", all is fine
node = node or self.getVmInfo(vmId).node
return types.UPID.fromDict(self._post('nodes/{}/qemu/{}/status/shutdown'.format(node, vmId)))
return types.UPID.fromDict(
self._post('nodes/{}/qemu/{}/status/shutdown'.format(node, vmId))
)
@ensureConected
def convertToTemplate(self, vmId: int, node: typing.Optional[str] = None) -> None:
@ -454,15 +531,35 @@ class ProxmoxClient:
resumeVm = startVm
@ensureConected
@allowCache('storage', CACHE_DURATION, cachingArgs=[1, 2], cachingKWArgs=['storage', 'node'], cachingKeyFnc=cachingKeyHelper)
@allowCache(
'storage',
CACHE_DURATION,
cachingArgs=[1, 2],
cachingKWArgs=['storage', 'node'],
cachingKeyFnc=cachingKeyHelper,
)
def getStorage(self, storage: str, node: str, **kwargs) -> types.StorageInfo:
return types.StorageInfo.fromDict(self._get('nodes/{}/storage/{}/status'.format(node, urllib.parse.quote(storage)))['data'])
return types.StorageInfo.fromDict(
self._get(
'nodes/{}/storage/{}/status'.format(node, urllib.parse.quote(storage))
)['data']
)
@ensureConected
@allowCache('storages', CACHE_DURATION, cachingArgs=[1, 2], cachingKWArgs=['node', 'content'], cachingKeyFnc=cachingKeyHelper)
def listStorages(self, node: typing.Union[None, str, typing.Iterable[str]] = None, content: typing.Optional[str] = None, **kwargs) -> typing.List[types.StorageInfo]:
"""We use a list for storage instead of an iterator, so we can cache it...
"""
@allowCache(
'storages',
CACHE_DURATION,
cachingArgs=[1, 2],
cachingKWArgs=['node', 'content'],
cachingKeyFnc=cachingKeyHelper,
)
def listStorages(
self,
node: typing.Union[None, str, typing.Iterable[str]] = None,
content: typing.Optional[str] = None,
**kwargs
) -> typing.List[types.StorageInfo]:
"""We use a list for storage instead of an iterator, so we can cache it..."""
nodeList: typing.Iterable[str]
if node is None:
nodeList = [n.name for n in self.getClusterInfo().nodes if n.online]
@ -470,11 +567,15 @@ class ProxmoxClient:
nodeList = [node]
else:
nodeList = node
params = '' if not content else '?content={}'.format(urllib.parse.quote(content))
params = (
'' if not content else '?content={}'.format(urllib.parse.quote(content))
)
result: typing.List[types.StorageInfo] = []
for nodeName in nodeList:
for storage in self._get('nodes/{}/storage{}'.format(nodeName, params))['data']:
for storage in self._get('nodes/{}/storage{}'.format(nodeName, params))[
'data'
]:
storage['node'] = nodeName
storage['content'] = storage['content'].split(',')
result.append(types.StorageInfo.fromDict(storage))
@ -482,11 +583,16 @@ class ProxmoxClient:
return result
@ensureConected
@allowCache('nodeStats', CACHE_DURATION//6, cachingKeyFnc=cachingKeyHelper)
@allowCache('nodeStats', CACHE_INFO_DURATION, cachingKeyFnc=cachingKeyHelper)
def getNodesStats(self, **kwargs) -> typing.List[types.NodeStats]:
return [types.NodeStats.fromDict(nodeStat) for nodeStat in self._get('cluster/resources?type=node')['data']]
return [
types.NodeStats.fromDict(nodeStat)
for nodeStat in self._get('cluster/resources?type=node')['data']
]
@ensureConected
@allowCache('pools', CACHE_DURATION//6, cachingKeyFnc=cachingKeyHelper)
@allowCache('pools', CACHE_DURATION // 6, cachingKeyFnc=cachingKeyHelper)
def listPools(self) -> typing.List[types.PoolInfo]:
return [types.PoolInfo.fromDict(nodeStat) for nodeStat in self._get('pools')['data']]
return [
types.PoolInfo.fromDict(nodeStat) for nodeStat in self._get('pools')['data']
]

View File

@ -107,8 +107,8 @@ class ProxmoxProvider(services.ServiceProvider): # pylint: disable=too-many-pub
def listMachines(self) -> typing.List[client.types.VMInfo]:
return self.__getApi().listVms()
def getMachineInfo(self, vmId: int) -> client.types.VMInfo:
return self.__getApi().getVmInfo(vmId, force=True)
def getMachineInfo(self, vmId: int, poolId: typing.Optional[str] = None) -> client.types.VMInfo:
return self.__getApi().getVMPoolInfo(vmId, poolId, force=True)
def getMachineConfiguration(self, vmId: int) -> client.types.VMConfiguration:
return self.__getApi().getVmConfiguration(vmId, force=True)

View File

@ -236,7 +236,7 @@ class ProxmoxLinkedService(Service): # pylint: disable=too-many-public-methods
)
def getMachineInfo(self, vmId: int) -> 'client.types.VMInfo':
return self.parent().getMachineInfo(vmId)
return self.parent().getMachineInfo(vmId, self.pool.value.strip())
def getMac(self, vmId: int) -> str:
config = self.parent().getMachineConfiguration(vmId)