1
0
mirror of https://github.com/dkmstr/openuds.git synced 2025-01-10 01:17:59 +03:00

Refactor ProxmoxProvider to support API Token authentication

This commit is contained in:
Adolfo Gómez García 2024-08-16 18:08:34 +02:00
parent 57f70b9d4f
commit 01353528dd
No known key found for this signature in database
GPG Key ID: DD1ABF20724CDA23
4 changed files with 135 additions and 51 deletions

View File

@ -30,7 +30,8 @@
"""
Author: Adolfo Gómez, dkmaster at dkmon dot com
"""
import random
import typing
import logging
from uds.services.Proxmox.proxmox import (
@ -63,7 +64,8 @@ class TestProxmoxClient(UDSTransactionTestCase):
port=int(v['port']),
username=v['username'],
password=v['password'],
verify_ssl=True,
use_api_token=v['use_api_token'] == 'true',
verify_ssl=False,
)
for vm in self.pclient.list_vms():
@ -76,3 +78,45 @@ class TestProxmoxClient(UDSTransactionTestCase):
for pool in self.pclient.list_pools():
if pool.id == v['test_pool']: # id is the pool name in proxmox
self.pool = pool
def _get_new_vmid(self) -> int:
MAX_RETRIES: typing.Final[int] = 512 # So we don't loop forever, just in case...
vmid = 0
for _ in range(MAX_RETRIES):
vmid = 100000 + random.randint(0, 899999) # Get a reasonable vmid
if self.pclient.is_vmid_available(vmid):
return vmid
# All assigned vmid will be left as unusable on UDS until released by time (3 years)
# This is not a problem at all, in the rare case that a machine id is released from uds db
# if it exists when we try to create a new one, we will simply try to get another one
self.fail(f'Could not get a new vmid!!: last tried {vmid}')
# Connect is not needed, because setUp will do the connection so if it fails, the test will throw an exception
def test_list_vms(self):
vms = self.pclient.list_vms()
# At least, the test vm should be there :)
self.assertTrue(len(vms) > 0)
self.assertTrue(self.vm.id > 0)
self.assertTrue(self.vm.status in prox_types.VMStatus)
self.assertTrue(self.vm.node)
self.assertTrue(self.vm.template in (True, False))
self.assertIsInstance(self.vm.agent, (str, type(None)))
self.assertIsInstance(self.vm.cpus, (int, type(None)))
self.assertIsInstance(self.vm.lock, (str, type(None)))
self.assertIsInstance(self.vm.disk, (int, type(None)))
self.assertIsInstance(self.vm.maxdisk, (int, type(None)))
self.assertIsInstance(self.vm.mem, (int, type(None)))
self.assertIsInstance(self.vm.maxmem, (int, type(None)))
self.assertIsInstance(self.vm.name, (str, type(None)))
self.assertIsInstance(self.vm.pid, (int, type(None)))
self.assertIsInstance(self.vm.qmpstatus, (str, type(None)))
self.assertIsInstance(self.vm.tags, (str, type(None)))
self.assertIsInstance(self.vm.uptime, (int, type(None)))
self.assertIsInstance(self.vm.netin, (int, type(None)))
self.assertIsInstance(self.vm.netout, (int, type(None)))
self.assertIsInstance(self.vm.diskread, (int, type(None)))
self.assertIsInstance(self.vm.diskwrite, (int, type(None)))
self.assertIsInstance(self.vm.vgpu_type, (str, type(None)))

View File

@ -82,10 +82,19 @@ class ProxmoxProvider(services.ServiceProvider):
default=8006,
)
use_api_token = gui.CheckBoxField(
label=_('Use API Token'),
order=3,
tooltip=_(
'Use API Token and secret instead of password. (username must contain the Token ID, the password will be the secret)'
),
required=False,
default=False,
)
username = gui.TextField(
length=32,
label=_('Username'),
order=3,
order=4,
tooltip=_('User with valid privileges on Proxmox, (use "user@authenticator" form)'),
required=True,
default='root@pam',
@ -93,7 +102,7 @@ class ProxmoxProvider(services.ServiceProvider):
password = gui.PasswordField(
length=32,
label=_('Password'),
order=4,
order=5,
tooltip=_('Password of the user of Proxmox'),
required=True,
)
@ -133,6 +142,7 @@ class ProxmoxProvider(services.ServiceProvider):
self.port.as_int(),
self.username.value,
self.password.value,
self.use_api_token.value,
self.timeout.as_int(),
False,
self.cache,

View File

@ -57,6 +57,7 @@ class ProxmoxClient:
_host: str
_port: int
_credentials: tuple[tuple[str, str], tuple[str, str]]
_use_api_token: bool
_url: str
_verify_ssl: bool
_timeout: int
@ -74,6 +75,7 @@ class ProxmoxClient:
port: int,
username: str,
password: str,
use_api_token: bool = False,
timeout: int = 5,
verify_ssl: bool = False,
cache: typing.Optional['Cache'] = None,
@ -81,6 +83,7 @@ class ProxmoxClient:
self._host = host
self._port = port
self._credentials = (('username', username), ('password', password))
self._use_api_token = use_api_token
self._verify_ssl = verify_ssl
self._timeout = timeout
self._url = 'https://{}:{}/api2/json/'.format(self._host, self._port)
@ -101,47 +104,62 @@ class ProxmoxClient:
return self._session
self._session = security.secure_requests_session(verify=self._verify_ssl)
self._session.headers.update(
{
if self._use_api_token:
token = f'{self._credentials[0][1]}={self._credentials[1][1]}'
# Set _ticket to something, so we don't try to connect again
self._ticket = 'API_TOKEN' # Using API token, not a real ticket
self._session.headers.update(
{
'Accept': 'application/json',
'Content-Type': 'application/x-www-form-urlencoded',
}
)
def _update_session(ticket: str, csrf: str) -> None:
session = typing.cast('requests.Session', self._session)
self._ticket = ticket
self._csrf = csrf
session.headers.update({'CSRFPreventionToken': self._csrf})
session.cookies.update( # pyright: ignore[reportUnknownMemberType]
{'PVEAuthCookie': self._ticket},
'Content-Type': 'application/json',
'Authorization': f'PVEAPIToken={token}',
}
)
else:
self._session.headers.update(
{
'Accept': 'application/json',
'Content-Type': 'application/x-www-form-urlencoded',
}
)
# we could cache this for a while, we know that at least for 30 minutes
if self.cache and not force:
dc = self.cache.get(self._host + 'conn')
if dc: # Stored on cache
_update_session(*dc) # Set session data, dc has ticket, csrf
def _update_session(ticket: str, csrf: str) -> None:
session = typing.cast('requests.Session', self._session)
self._ticket = ticket
self._csrf = csrf
session.headers.update({'CSRFPreventionToken': self._csrf})
session.cookies.update( # pyright: ignore[reportUnknownMemberType]
{'PVEAuthCookie': self._ticket},
)
try:
result = self._session.post(
url=self.get_api_url('access/ticket'),
data=self._credentials,
timeout=self._timeout,
)
if not result.ok:
raise exceptions.ProxmoxAuthError(result.content.decode('utf8'))
data = result.json()['data']
ticket = data['ticket']
csrf = data['CSRFPreventionToken']
# we could cache this for a while, we know that at least for 30 minutes
if self.cache and not force:
dc = self.cache.get(self._host + 'conn')
if dc: # Stored on cache
_update_session(*dc) # Set session data, dc has ticket, csrf
if self.cache:
self.cache.put(self._host + 'conn', (ticket, csrf), validity=1800) # 30 minutes
try:
result = self._session.post(
url=self.get_api_url('access/ticket'),
data=self._credentials,
timeout=self._timeout,
)
if not result.ok:
raise exceptions.ProxmoxAuthError(result.content.decode('utf8'))
data = result.json()['data']
ticket = data['ticket']
csrf = data['CSRFPreventionToken']
if self.cache:
self.cache.put(self._host + 'conn', (ticket, csrf), validity=1800) # 30 minutes
_update_session(ticket, csrf)
except requests.RequestException as e:
raise exceptions.ProxmoxConnectionError(str(e)) from e
return self._session
_update_session(ticket, csrf)
return self._session
except requests.RequestException as e:
raise exceptions.ProxmoxConnectionError(str(e)) from e
def ensure_correct(self, response: 'requests.Response', *, node: typing.Optional[str]) -> typing.Any:
if not response.ok:
@ -240,6 +258,10 @@ class ProxmoxClient:
def test(self) -> bool:
try:
self.connect()
if self._use_api_token:
# When using api token, we need to ask for something
# Because the login has not been done, just the token has been set on headers
self.get_cluster_info()
except Exception:
# logger.error('Error testing proxmox: %s', e)
return False
@ -249,6 +271,13 @@ class ProxmoxClient:
def get_cluster_info(self, **kwargs: typing.Any) -> types.ClusterInfo:
return types.ClusterInfo.from_dict(self.do_get('cluster/status'))
@cached('cluster_res', consts.CACHE_DURATION, key_helper=caching_key_helper)
def get_cluster_resources(
self, type: typing.Literal['vm', 'storage', 'node', 'sdn'], **kwargs: typing.Any
) -> list[dict[str, typing.Any]]:
# i.e.: self.do_get('cluster/resources?type=vm')
return self.do_get(f'cluster/resources?type={type}')['data']
def get_next_vmid(self) -> int:
return int(self.do_get('cluster/nextid')['data'])
@ -562,7 +591,7 @@ class ProxmoxClient:
return sorted(
[
types.VMInfo.from_dict(vm_info)
for vm_info in self.do_get('cluster/resources?type=vm')['data']
for vm_info in self.get_cluster_resources('vm')
if vm_info['type'] == 'qemu' and vm_info['node'] in node_list
],
key=lambda x: f'{x.node}{x.name}',
@ -702,17 +731,18 @@ class ProxmoxClient:
) -> list[types.StorageInfo]:
"""We use a list for storage instead of an iterator, so we can cache it..."""
node_list: set[str]
if node is None:
node_list = {n.name for n in self.get_cluster_info().nodes if n.online}
elif isinstance(node, str):
node_list = set([node])
else:
node_list = set(node)
match node:
case None:
node_list = {n.name for n in self.get_cluster_info().nodes if n.online}
case str():
node_list = {node}
case collections.abc.Iterable():
node_list = set(node)
return sorted(
[
types.StorageInfo.from_dict(st_info)
for st_info in self.do_get('cluster/resources?type=storage')['data']
for st_info in self.get_cluster_resources('storage')
if st_info['node'] in node_list and (content is None or content in st_info['content'])
],
key=lambda x: f'{x.node}{x.storage}',

View File

@ -198,7 +198,7 @@ class TaskStatus:
starttime=datetime.datetime.fromtimestamp(data['starttime']),
type=data['type'],
status=data['status'],
exitstatus=data['exitstatus'],
exitstatus=data.get('exitstatus', ''),
user=data['user'],
upid=data['upid'],
id=data['id'],