mirror of
https://github.com/dkmstr/openuds.git
synced 2025-10-23 23:34:07 +03:00
Compare commits
11 Commits
dev/janier
...
dev/andres
| Author | SHA1 | Date | |
|---|---|---|---|
|
|
53beb1d0f2 | ||
|
|
1ddbb1120b | ||
|
|
461917e652 | ||
|
|
33f5c1d34a | ||
|
|
a867ed7b6a | ||
|
|
67bbce579f | ||
|
|
cdd5f3c1d1 | ||
|
|
2731013710 | ||
|
|
39aa842d7f | ||
|
|
99b33d0603 | ||
|
|
398ec1aac0 |
2
actor
2
actor
Submodule actor updated: 4638fd77a2...04ce3fc2d1
2
client
2
client
Submodule client updated: 517f8935a2...5b044bca34
@@ -158,27 +158,29 @@ class MetaPools(ModelHandler[MetaPoolItem]):
|
|||||||
(i.pool.userServices.filter(state=State.PREPARING).count()) for i in all_pools
|
(i.pool.userServices.filter(state=State.PREPARING).count()) for i in all_pools
|
||||||
)
|
)
|
||||||
|
|
||||||
return MetaPoolItem(
|
val = {
|
||||||
id=item.uuid,
|
'id': item.uuid,
|
||||||
name=item.name,
|
'name': item.name,
|
||||||
short_name=item.short_name,
|
'short_name': item.short_name,
|
||||||
tags=[tag.tag for tag in item.tags.all()],
|
'tags': [tag.tag for tag in item.tags.all()],
|
||||||
comments=item.comments,
|
'comments': item.comments,
|
||||||
thumb=item.image.thumb64 if item.image is not None else DEFAULT_THUMB_BASE64,
|
'thumb': item.image.thumb64 if item.image is not None else DEFAULT_THUMB_BASE64,
|
||||||
image_id=item.image.uuid if item.image is not None else None,
|
'image_id': item.image.uuid if item.image is not None else None,
|
||||||
servicesPoolGroup_id=pool_group_id,
|
'servicesPoolGroup_id': pool_group_id,
|
||||||
pool_group_name=pool_group_name,
|
'pool_group_name': pool_group_name,
|
||||||
pool_group_thumb=pool_group_thumb,
|
'pool_group_thumb': pool_group_thumb,
|
||||||
user_services_count=userservices_total,
|
'user_services_count': userservices_total,
|
||||||
user_services_in_preparation=userservices_in_preparation,
|
'user_services_in_preparation': userservices_in_preparation,
|
||||||
visible=item.visible,
|
'visible': item.visible,
|
||||||
policy=str(item.policy),
|
'policy': str(item.policy),
|
||||||
fallbackAccess=item.fallbackAccess,
|
'fallbackAccess': item.fallbackAccess,
|
||||||
permission=permissions.effective_permissions(self._user, item),
|
'permission': permissions.effective_permissions(self._user, item),
|
||||||
calendar_message=item.calendar_message,
|
'calendar_message': item.calendar_message,
|
||||||
transport_grouping=item.transport_grouping,
|
'transport_grouping': str(item.transport_grouping),
|
||||||
ha_policy=str(item.ha_policy),
|
'ha_policy': str(item.ha_policy),
|
||||||
)
|
}
|
||||||
|
|
||||||
|
return val
|
||||||
|
|
||||||
# Gui related
|
# Gui related
|
||||||
def get_gui(self, for_type: str) -> list[types.ui.GuiElement]:
|
def get_gui(self, for_type: str) -> list[types.ui.GuiElement]:
|
||||||
|
|||||||
@@ -1,580 +0,0 @@
|
|||||||
import token
|
|
||||||
from typing import List, Dict, Any, Tuple
|
|
||||||
|
|
||||||
from py import log
|
|
||||||
from pyparsing import col
|
|
||||||
def print_resource_names(items: List[Dict[str, Any]], title: str) -> None:
|
|
||||||
logging.info("list of %s:", title)
|
|
||||||
for idx, item in enumerate(items, start=1):
|
|
||||||
name = item.get('metadata', {}).get('name', 'UNKNOWN')
|
|
||||||
logging.info("%d - %s", idx, name)
|
|
||||||
#
|
|
||||||
# Copyright (c) 2025 Virtual Cable S.L.U.
|
|
||||||
# All rights reserved.
|
|
||||||
#
|
|
||||||
# Redistribution and use in source and binary forms, with or without modification,
|
|
||||||
# are permitted provided that the following conditions are met:
|
|
||||||
#
|
|
||||||
# * Redistributions of source code must retain the above copyright notice,
|
|
||||||
# this list of conditions and the following disclaimer.
|
|
||||||
# * Redistributions in binary form must reproduce the above copyright notice,
|
|
||||||
# this list of conditions and the following disclaimer in the documentation
|
|
||||||
# and/or other materials provided with the distribution.
|
|
||||||
# * Neither the name of Virtual Cable S.L.U. nor the names of its contributors
|
|
||||||
# may be used to endorse or promote products derived from this software
|
|
||||||
# without specific prior written permission.
|
|
||||||
#
|
|
||||||
# THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS IS"
|
|
||||||
# AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE
|
|
||||||
# IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE ARE
|
|
||||||
# DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT HOLDER OR CONTRIBUTORS BE LIABLE
|
|
||||||
# FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR CONSEQUENTIAL
|
|
||||||
# DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR
|
|
||||||
# SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER
|
|
||||||
# CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY,
|
|
||||||
# OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE
|
|
||||||
# OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
|
|
||||||
"""
|
|
||||||
Author: Adolfo Gómez, dkmaster at dkmon dot com
|
|
||||||
"""
|
|
||||||
import collections.abc
|
|
||||||
import typing
|
|
||||||
import datetime
|
|
||||||
import urllib.parse
|
|
||||||
import logging
|
|
||||||
import requests
|
|
||||||
import time
|
|
||||||
|
|
||||||
|
|
||||||
from uds.core.util import security
|
|
||||||
from uds.core.util.cache import Cache
|
|
||||||
from uds.core.util.decorators import cached
|
|
||||||
from uds.core.util.model import sql_now
|
|
||||||
|
|
||||||
from . import types, consts, exceptions
|
|
||||||
|
|
||||||
|
|
||||||
logger = logging.getLogger(__name__)
|
|
||||||
|
|
||||||
class OpenshiftClient:
|
|
||||||
cluster_url: str
|
|
||||||
api_url: str
|
|
||||||
username: str
|
|
||||||
password: str
|
|
||||||
namespace: str
|
|
||||||
_verify_ssl: bool
|
|
||||||
_timeout: int
|
|
||||||
_token_expiry: datetime.datetime
|
|
||||||
|
|
||||||
_session: typing.Optional[requests.Session] = None
|
|
||||||
|
|
||||||
cache: typing.Optional['Cache']
|
|
||||||
|
|
||||||
def __init__(
|
|
||||||
self,
|
|
||||||
cluster_url: str,
|
|
||||||
api_url: str,
|
|
||||||
username: str,
|
|
||||||
password: str,
|
|
||||||
namespace: str = 'default',
|
|
||||||
timeout: int = 5,
|
|
||||||
verify_ssl: bool = False,
|
|
||||||
cache: typing.Optional['Cache'] = None,
|
|
||||||
) -> None:
|
|
||||||
self.cluster_url = cluster_url
|
|
||||||
self.api_url = api_url
|
|
||||||
self.username : str = username
|
|
||||||
self.password: str = password
|
|
||||||
self.namespace: str = namespace
|
|
||||||
|
|
||||||
self._verify_ssl: bool = verify_ssl
|
|
||||||
self._timeout: int = timeout
|
|
||||||
|
|
||||||
self.cache = cache
|
|
||||||
|
|
||||||
self._access_token = ''
|
|
||||||
self._token_expiry = datetime.datetime.min
|
|
||||||
|
|
||||||
@property
|
|
||||||
def session(self) -> requests.Session:
|
|
||||||
return self.connect()
|
|
||||||
|
|
||||||
def connect(self, force: bool = False) -> requests.Session:
|
|
||||||
# For testing, always use the fixed token
|
|
||||||
session = self._session = security.secure_requests_session(verify=self._verify_ssl)
|
|
||||||
session.headers.update(
|
|
||||||
{
|
|
||||||
'Accept': 'application/json',
|
|
||||||
'Content-Type': 'application/json',
|
|
||||||
'Authorization': 'Bearer sha256~82_k2GpPJ4som5Vfy3P6kcQgeeHdxo-_txXR760K_tM',
|
|
||||||
}
|
|
||||||
)
|
|
||||||
return session
|
|
||||||
|
|
||||||
def get_token(self) -> str|None:
|
|
||||||
return "sha256~82_k2GpPJ4som5Vfy3P6kcQgeeHdxo-_txXR760K_tM"
|
|
||||||
try:
|
|
||||||
url = f"{self.cluster_url}/oauth/authorize?client_id=openshift-challenging-client&response_type=token"
|
|
||||||
r = requests.get(url, auth=(self.username, self.password), timeout=15, allow_redirects=True, verify=False)
|
|
||||||
if "access_token=" not in r.url:
|
|
||||||
raise Exception("No se encontró access_token en la URL de respuesta")
|
|
||||||
token = r.url.split("access_token=")[1].split("&")[0]
|
|
||||||
return token
|
|
||||||
except Exception as ex:
|
|
||||||
logging.error(f"No se pudo obtener token: {ex}")
|
|
||||||
raise
|
|
||||||
|
|
||||||
def get_api_url(self, path: str, *parameters: tuple[str, str]) -> str:
|
|
||||||
url = self.api_url + path
|
|
||||||
if parameters:
|
|
||||||
url += '?' + urllib.parse.urlencode(
|
|
||||||
parameters, doseq=True, safe='[]', quote_via=urllib.parse.quote_plus
|
|
||||||
)
|
|
||||||
return url
|
|
||||||
|
|
||||||
def do_request(
|
|
||||||
self,
|
|
||||||
method: typing.Literal['GET', 'POST', 'PUT', 'DELETE'],
|
|
||||||
path: str,
|
|
||||||
*parameters: tuple[str, str],
|
|
||||||
data: typing.Any = None,
|
|
||||||
check_for_success: bool = False,
|
|
||||||
) -> typing.Any:
|
|
||||||
logger.debug(
|
|
||||||
'Requesting %s %s with parameters %s and data %s',
|
|
||||||
method.upper(),
|
|
||||||
path,
|
|
||||||
parameters,
|
|
||||||
data,
|
|
||||||
)
|
|
||||||
try:
|
|
||||||
match method:
|
|
||||||
case 'GET':
|
|
||||||
response = self.session.get(
|
|
||||||
self.get_api_url(path, *parameters),
|
|
||||||
timeout=self._timeout,
|
|
||||||
)
|
|
||||||
case 'POST':
|
|
||||||
response = self.session.post(
|
|
||||||
self.get_api_url(path, *parameters),
|
|
||||||
json=data,
|
|
||||||
timeout=self._timeout,
|
|
||||||
)
|
|
||||||
case 'PUT':
|
|
||||||
response = self.session.put(
|
|
||||||
self.get_api_url(path, *parameters),
|
|
||||||
json=data,
|
|
||||||
timeout=self._timeout,
|
|
||||||
)
|
|
||||||
case 'DELETE':
|
|
||||||
response = self.session.delete(
|
|
||||||
self.get_api_url(path, *parameters),
|
|
||||||
timeout=self._timeout,
|
|
||||||
)
|
|
||||||
case _:
|
|
||||||
raise ValueError(f'Unsupported HTTP method: {method}')
|
|
||||||
except requests.ConnectionError as e:
|
|
||||||
raise exceptions.OpenshiftConnectionError(str(e))
|
|
||||||
except requests.RequestException as e:
|
|
||||||
raise exceptions.OpenshiftError(f'Error during request: {str(e)}')
|
|
||||||
logger.debug('Request result to %s: %s -- %s', path, response.status_code, response.content[:64])
|
|
||||||
|
|
||||||
if not response.ok:
|
|
||||||
if response.status_code == 401:
|
|
||||||
# Unauthorized, try to refresh the token
|
|
||||||
logger.debug('Unauthorized request, refreshing token')
|
|
||||||
self._session = None
|
|
||||||
raise exceptions.OpenshiftAuthError(
|
|
||||||
'Unauthorized request, please check your credentials or token expiry'
|
|
||||||
)
|
|
||||||
elif response.status_code == 403:
|
|
||||||
# Forbidden, user does not have permissions
|
|
||||||
logger.debug('Forbidden request, check your permissions')
|
|
||||||
raise exceptions.OpenshiftPermissionError('Forbidden request, please check your permissions')
|
|
||||||
elif response.status_code == 404:
|
|
||||||
# Not found, resource does not exist
|
|
||||||
logger.debug('Resource not found: %s', path)
|
|
||||||
raise exceptions.OpenshiftNotFoundError(f'Resource not found: {path}')
|
|
||||||
|
|
||||||
error_message = f'Error on request {method.upper()} {path}: {response.status_code} - {response.content.decode("utf8")[:128]}'
|
|
||||||
logger.debug(error_message)
|
|
||||||
raise exceptions.OpenshiftError(error_message)
|
|
||||||
|
|
||||||
try:
|
|
||||||
data = response.json()
|
|
||||||
except Exception as e:
|
|
||||||
error_message = f'Error parsing JSON response from {method.upper()} {path}: {str(e)}'
|
|
||||||
logger.debug(error_message)
|
|
||||||
raise exceptions.OpenshiftError(error_message)
|
|
||||||
|
|
||||||
if check_for_success and not data.get('success', False):
|
|
||||||
error_message = f'Error on request {method.upper()} {path}: {data.get("error", "Unknown error")}'
|
|
||||||
logger.debug(error_message)
|
|
||||||
raise exceptions.OpenshiftError(error_message)
|
|
||||||
|
|
||||||
return data
|
|
||||||
|
|
||||||
def do_paginated_request(
|
|
||||||
self,
|
|
||||||
method: typing.Literal['GET', 'POST', 'PUT', 'DELETE'],
|
|
||||||
path: str,
|
|
||||||
key: str,
|
|
||||||
*parameters: tuple[str, str],
|
|
||||||
data: typing.Any = None,
|
|
||||||
) -> typing.Iterator[typing.Any]:
|
|
||||||
"""
|
|
||||||
Make a paginated request to the Openshift API.
|
|
||||||
Args:
|
|
||||||
method (str): HTTP method to use (GET, POST, PUT, DELETE)
|
|
||||||
path (str): API endpoint path
|
|
||||||
*parameters: Additional parameters to include in the request
|
|
||||||
data (Any): Data to send with the request (for POST/PUT)
|
|
||||||
Yields:
|
|
||||||
typing.Any: The JSON response from each page of the request
|
|
||||||
|
|
||||||
Note:
|
|
||||||
The responses has also the "meta" key, which contains pagination information:
|
|
||||||
offset: int64
|
|
||||||
max: int64
|
|
||||||
size: int64
|
|
||||||
total: int64
|
|
||||||
|
|
||||||
This information is used to determine if there are more pages to fetch.
|
|
||||||
If not present, we try our best by counting the number of items returned
|
|
||||||
and comparing it with the items requested per page (consts.MAX_ITEMS_PER_REQUEST).
|
|
||||||
"""
|
|
||||||
offset = 0
|
|
||||||
while True:
|
|
||||||
params: list[tuple[str, str]] = [i for i in parameters] + [
|
|
||||||
('max', str(consts.MAX_ITEMS_PER_REQUEST)),
|
|
||||||
('offset', str(offset)),
|
|
||||||
]
|
|
||||||
response = self.do_request(method, path, *params, data=data)
|
|
||||||
data = response.get(key, [])
|
|
||||||
yield from data
|
|
||||||
|
|
||||||
# Checke meta information to see if we have more pages
|
|
||||||
meta = response.get('meta', {})
|
|
||||||
if not meta: # Do our best to avoid errors if meta is not present
|
|
||||||
# Check if we have more pages
|
|
||||||
if len(data) < consts.MAX_ITEMS_PER_REQUEST:
|
|
||||||
break
|
|
||||||
elif meta.get('offset', 0) + meta.get('size', 0) >= meta.get('total', 0):
|
|
||||||
# No more pages, as offset is greater than or equal to total
|
|
||||||
break
|
|
||||||
|
|
||||||
offset += consts.MAX_ITEMS_PER_REQUEST
|
|
||||||
|
|
||||||
#* --- OpenShift resource Methods ---
|
|
||||||
|
|
||||||
def get_vm_by_name(self, vm_name: str) -> types.VMDefinition | None:
|
|
||||||
path = f"apis/kubevirt.io/v1/namespaces/{self.namespace}/virtualmachines/{vm_name}"
|
|
||||||
try:
|
|
||||||
response = self.do_request('GET', path)
|
|
||||||
return types.VMDefinition.from_dict(response) # Convertir a VMDefinition aquí
|
|
||||||
except exceptions.OpenshiftNotFoundError:
|
|
||||||
return None
|
|
||||||
except Exception as e:
|
|
||||||
logger.info(f"Error getting VM {vm_name}: {e}")
|
|
||||||
return None
|
|
||||||
|
|
||||||
def monitor_vm_clone(self, api_url: str, namespace: str, clone_name: str, polling_interval: int = 5) -> None:
|
|
||||||
clone_url = f"{api_url}/apis/clone.kubevirt.io/v1alpha1/namespaces/{namespace}/virtualmachineclones/{clone_name}"
|
|
||||||
headers = {"Authorization": f"Bearer {self.get_token()}", "Accept": "application/json"}
|
|
||||||
logging.info("Monitoring clone process for '%s'...", clone_name)
|
|
||||||
while True:
|
|
||||||
try:
|
|
||||||
response = requests.get(clone_url, headers=headers, verify=False)
|
|
||||||
if response.status_code == 200:
|
|
||||||
clone_data = response.json()
|
|
||||||
status = clone_data.get('status', {})
|
|
||||||
phase = status.get('phase', 'Unknown')
|
|
||||||
logging.info("Phase: %s", phase)
|
|
||||||
for condition in status.get('conditions', []):
|
|
||||||
ctype = condition.get('type', '')
|
|
||||||
cstatus = condition.get('status', '')
|
|
||||||
cmsg = condition.get('message', '')
|
|
||||||
logging.info(" %s: %s - %s", ctype, cstatus, cmsg)
|
|
||||||
if phase == 'Succeeded':
|
|
||||||
logging.info("Clone '%s' completed successfully!", clone_name)
|
|
||||||
break
|
|
||||||
elif phase == 'Failed':
|
|
||||||
logging.error("Clone '%s' failed!", clone_name)
|
|
||||||
break
|
|
||||||
elif response.status_code == 404:
|
|
||||||
logging.warning("Clone resource '%s' not found. May have been cleaned up.", clone_name)
|
|
||||||
break
|
|
||||||
else:
|
|
||||||
logging.error("Error monitoring clone: %d", response.status_code)
|
|
||||||
except Exception as e:
|
|
||||||
logging.error("Monitoring exception: %s", e)
|
|
||||||
logging.info("Waiting %d seconds before next check...", polling_interval)
|
|
||||||
time.sleep(polling_interval)
|
|
||||||
|
|
||||||
def get_vm_pvc_or_dv_name(self, api_url: str, namespace: str, vm_name: str) -> Tuple[str, str]:
|
|
||||||
vm_url = f"{api_url}/apis/kubevirt.io/v1/namespaces/{namespace}/virtualmachines/{vm_name}"
|
|
||||||
headers = {"Authorization": f"Bearer {self.get_token()}", "Accept": "application/json"}
|
|
||||||
response = requests.get(vm_url, headers=headers, verify=False)
|
|
||||||
if response.status_code == 200:
|
|
||||||
vm_obj = response.json()
|
|
||||||
volumes = vm_obj.get("spec", {}).get("template", {}).get("spec", {}).get("volumes", [])
|
|
||||||
for vol in volumes:
|
|
||||||
pvc = vol.get("persistentVolumeClaim")
|
|
||||||
if pvc:
|
|
||||||
return pvc.get("claimName"), "pvc"
|
|
||||||
dv = vol.get("dataVolume")
|
|
||||||
if dv:
|
|
||||||
return dv.get("name"), "dv"
|
|
||||||
raise Exception(f"No PVC or DataVolume found in VM {vm_name}")
|
|
||||||
|
|
||||||
def get_datavolume_size(self, api_url: str, namespace: str, dv_name: str) -> str:
|
|
||||||
url = f"{api_url}/apis/cdi.kubevirt.io/v1beta1/namespaces/{namespace}/datavolumes/{dv_name}"
|
|
||||||
headers = {"Authorization": f"Bearer {self.get_token()}", "Accept": "application/json"}
|
|
||||||
response = requests.get(url, headers=headers, verify=False)
|
|
||||||
if response.status_code == 200:
|
|
||||||
dv = response.json()
|
|
||||||
size = dv.get("status", {}).get("amount", None)
|
|
||||||
if size:
|
|
||||||
return size
|
|
||||||
return dv.get("spec", {}).get("pvc", {}).get("resources", {}).get("requests", {}).get("storage") or ""
|
|
||||||
raise Exception(f"No se pudo obtener el tamaño del DataVolume {dv_name}")
|
|
||||||
|
|
||||||
def get_pvc_size(self, api_url: str, namespace: str, pvc_name: str) -> str:
|
|
||||||
url = f"{api_url}/api/v1/namespaces/{namespace}/persistentvolumeclaims/{pvc_name}"
|
|
||||||
headers = {"Authorization": f"Bearer {token}", "Accept": "application/json"}
|
|
||||||
response = requests.get(url, headers=headers, verify=False)
|
|
||||||
if response.status_code == 200:
|
|
||||||
pvc = response.json()
|
|
||||||
capacity = pvc.get("status", {}).get("capacity", {}).get("storage")
|
|
||||||
if capacity:
|
|
||||||
return capacity
|
|
||||||
raise Exception(f"No se pudo obtener el tamaño del PVC {pvc_name}")
|
|
||||||
|
|
||||||
def clone_pvc_with_datavolume(self, api_url: str, namespace: str, source_pvc_name: str, cloned_pvc_name: str, storage_class: str, storage_size: str) -> bool:
|
|
||||||
dv_url = f"{api_url}/apis/cdi.kubevirt.io/v1beta1/namespaces/{namespace}/datavolumes"
|
|
||||||
headers = {
|
|
||||||
"Authorization": f"Bearer {token}",
|
|
||||||
"Accept": "application/json",
|
|
||||||
"Content-Type": "application/json"
|
|
||||||
}
|
|
||||||
body: Dict[str, Any] = {
|
|
||||||
"apiVersion": "cdi.kubevirt.io/v1beta1",
|
|
||||||
"kind": "DataVolume",
|
|
||||||
"metadata": {"name": cloned_pvc_name, "namespace": namespace},
|
|
||||||
"spec": {
|
|
||||||
"source": {
|
|
||||||
"pvc": {"name": source_pvc_name, "namespace": namespace}
|
|
||||||
},
|
|
||||||
"pvc": {
|
|
||||||
"accessModes": ["ReadWriteOnce"],
|
|
||||||
"resources": {"requests": {"storage": storage_size}},
|
|
||||||
"storageClassName": storage_class
|
|
||||||
}
|
|
||||||
}
|
|
||||||
}
|
|
||||||
response = requests.post(dv_url, headers=headers, json=body, verify=False)
|
|
||||||
if response.status_code == 201:
|
|
||||||
logging.info(f"DataVolume '{cloned_pvc_name}' creado exitosamente")
|
|
||||||
return True
|
|
||||||
logging.error(f"Fallo al crear DataVolume: {response.status_code} {response.text}")
|
|
||||||
return False
|
|
||||||
|
|
||||||
def create_vm_from_pvc(self, api_url: str, namespace: str, source_vm_name: str, new_vm_name: str, new_dv_name: str, source_pvc_name: str) -> bool:
|
|
||||||
original_vm_url = f"{api_url}/apis/kubevirt.io/v1/namespaces/{namespace}/virtualmachines/{source_vm_name}"
|
|
||||||
headers = {"Authorization": f"Bearer {token}", "Accept": "application/json"}
|
|
||||||
resp = requests.get(original_vm_url, headers=headers, verify=False)
|
|
||||||
if resp.status_code != 200:
|
|
||||||
logging.error(f"No se pudo obtener la VM origen: {resp.status_code} {resp.text}")
|
|
||||||
return False
|
|
||||||
|
|
||||||
vm_obj = resp.json()
|
|
||||||
vm_obj['metadata']['name'] = new_vm_name
|
|
||||||
|
|
||||||
for k in ['resourceVersion', 'uid', 'selfLink']:
|
|
||||||
vm_obj['metadata'].pop(k, None)
|
|
||||||
vm_obj.pop('status', None)
|
|
||||||
|
|
||||||
vm_obj['spec'].pop('running', None)
|
|
||||||
vm_obj['spec']['runStrategy'] = 'Always'
|
|
||||||
|
|
||||||
for vol in vm_obj['spec']['template']['spec']['volumes']:
|
|
||||||
if 'dataVolume' in vol:
|
|
||||||
vol['dataVolume']['name'] = new_dv_name
|
|
||||||
elif 'persistentVolumeClaim' in vol:
|
|
||||||
vol['persistentVolumeClaim']['claimName'] = new_dv_name
|
|
||||||
|
|
||||||
vm_obj['spec']['dataVolumeTemplates'] = [{
|
|
||||||
"metadata": {"name": new_dv_name},
|
|
||||||
"spec": {
|
|
||||||
"source": {"pvc": {"name": source_pvc_name}},
|
|
||||||
"pvc": {
|
|
||||||
"accessModes": ["ReadWriteOnce"],
|
|
||||||
"resources": {"requests": {"storage": "399Gi"}},
|
|
||||||
"storageClassName": "crc-csi-hostpath-provisioner"
|
|
||||||
}
|
|
||||||
}
|
|
||||||
}]
|
|
||||||
|
|
||||||
interfaces = vm_obj.get('spec', {}).get('template', {}).get('spec', {}).get('domain', {}).get('devices', {}).get('interfaces', [])
|
|
||||||
for iface in interfaces:
|
|
||||||
iface.pop('macAddress', None)
|
|
||||||
|
|
||||||
create_url = f"{api_url}/apis/kubevirt.io/v1/namespaces/{namespace}/virtualmachines"
|
|
||||||
headers["Content-Type"] = "application/json"
|
|
||||||
resp = requests.post(create_url, headers=headers, json=vm_obj, verify=False)
|
|
||||||
if resp.status_code == 201:
|
|
||||||
logging.info(f"VM '{new_vm_name}' creada correctamente con DataVolumeTemplate.")
|
|
||||||
return True
|
|
||||||
logging.error(f"Error creando VM: {resp.status_code} {resp.text}")
|
|
||||||
return False
|
|
||||||
|
|
||||||
def wait_for_datavolume_clone_progress(self, api_url: str, namespace: str, datavolume_name: str, timeout: int = 3000, polling_interval: int = 5) -> bool:
|
|
||||||
url = f"{api_url}/apis/cdi.kubevirt.io/v1beta1/namespaces/{namespace}/datavolumes/{datavolume_name}"
|
|
||||||
headers = {"Authorization": f"Bearer {token}", "Accept": "application/json"}
|
|
||||||
start = time.time()
|
|
||||||
while time.time() - start < timeout:
|
|
||||||
response = requests.get(url, headers=headers, verify=False)
|
|
||||||
if response.status_code == 200:
|
|
||||||
dv = response.json()
|
|
||||||
status = dv.get('status', {})
|
|
||||||
phase = status.get('phase')
|
|
||||||
progress = status.get('progress', 'N/A')
|
|
||||||
logging.info(f"DataVolume {datavolume_name} status: {phase}, progress: {progress}")
|
|
||||||
if phase == 'Succeeded':
|
|
||||||
logging.info(f"DataVolume {datavolume_name} clonación completada")
|
|
||||||
return True
|
|
||||||
elif phase == 'Failed':
|
|
||||||
logging.error(f"DataVolume {datavolume_name} clonación fallida")
|
|
||||||
return False
|
|
||||||
else:
|
|
||||||
logging.error(f"Error consultando DataVolume {datavolume_name}: {response.status_code}")
|
|
||||||
time.sleep(polling_interval)
|
|
||||||
logging.error(f"Timeout esperando clonación de DataVolume {datavolume_name}")
|
|
||||||
return False
|
|
||||||
|
|
||||||
def start_vm(self, api_url: str, namespace: str, vm_name: str) -> bool:
|
|
||||||
url = f"{api_url}/apis/kubevirt.io/v1/namespaces/{namespace}/virtualmachines/{vm_name}"
|
|
||||||
headers = {
|
|
||||||
"Authorization": f"Bearer {token}",
|
|
||||||
"Content-Type": "application/merge-patch+json",
|
|
||||||
"Accept": "application/json"
|
|
||||||
}
|
|
||||||
body: Dict[str, Any] = {
|
|
||||||
"spec": {
|
|
||||||
"runStrategy": "Always"
|
|
||||||
}
|
|
||||||
}
|
|
||||||
response = requests.patch(url, headers=headers, json=body, verify=False)
|
|
||||||
if response.status_code in [200, 201]:
|
|
||||||
logging.info(f"VM {vm_name} puesta en marcha.")
|
|
||||||
return True
|
|
||||||
else:
|
|
||||||
logging.info(f"Error al arrancar la VM {vm_name}: {response.status_code} - {response.text}")
|
|
||||||
return False
|
|
||||||
|
|
||||||
def stop_vm(self, api_url: str, namespace: str, vm_name: str) -> bool:
|
|
||||||
url = f"{api_url}/apis/kubevirt.io/v1/namespaces/{namespace}/virtualmachines/{vm_name}"
|
|
||||||
headers = {
|
|
||||||
"Authorization": f"Bearer {token}",
|
|
||||||
"Content-Type": "application/merge-patch+json",
|
|
||||||
"Accept": "application/json"
|
|
||||||
}
|
|
||||||
body: Dict[str, Any] = {
|
|
||||||
"spec": {
|
|
||||||
"runStrategy": "Halted"
|
|
||||||
}
|
|
||||||
}
|
|
||||||
response = requests.patch(url, headers=headers, json=body, verify=False)
|
|
||||||
if response.status_code in [200, 201]:
|
|
||||||
logging.info(f"VM {vm_name} se apagará.")
|
|
||||||
return True
|
|
||||||
else:
|
|
||||||
logging.info(f"Error al apagar la VM {vm_name}: {response.status_code} - {response.text}")
|
|
||||||
return False
|
|
||||||
|
|
||||||
def copy_vm_same_size(self, api_url: str, namespace: str, source_vm_name: str, new_vm_name: str, storage_class: str) -> None:
|
|
||||||
source_pvc_name, vol_type = self.get_vm_pvc_or_dv_name(api_url, namespace, source_vm_name)
|
|
||||||
size = self.get_pvc_size(api_url, namespace, source_pvc_name)
|
|
||||||
new_pvc_name = f"{new_vm_name}-disk"
|
|
||||||
if self.clone_pvc_with_datavolume(api_url, namespace, source_pvc_name, new_pvc_name, storage_class, size):
|
|
||||||
self.create_vm_from_pvc(api_url, namespace, source_vm_name, new_vm_name, new_pvc_name, source_pvc_name)
|
|
||||||
else:
|
|
||||||
logging.error("Error al clonar PVC")
|
|
||||||
|
|
||||||
|
|
||||||
# @cached('test', consts.CACHE_VM_INFO_DURATION)
|
|
||||||
def test(self) -> bool:
|
|
||||||
# Simple test: try to enumerate VMs to check connectivity and authentication
|
|
||||||
try:
|
|
||||||
vm_url = f"{self.api_url}/apis/kubevirt.io/v1/namespaces/{self.namespace}/virtualmachines"
|
|
||||||
headers = {'Authorization': f'Bearer {self.get_token()}', 'Accept': 'application/json'}
|
|
||||||
response = requests.get(vm_url, headers=headers, verify=self._verify_ssl, timeout=self._timeout)
|
|
||||||
response.raise_for_status()
|
|
||||||
logger.debug('Successfully enumerated VMs for test')
|
|
||||||
return True
|
|
||||||
except Exception as e:
|
|
||||||
logger.error(f"Error testing Openshift by enumerating VMs: {e}")
|
|
||||||
raise exceptions.OpenshiftConnectionError(str(e)) from e
|
|
||||||
return False
|
|
||||||
|
|
||||||
def enumerate_vms(self) -> collections.abc.Iterator[types.VMDefinition]:
|
|
||||||
"""
|
|
||||||
Fetch all VMs from KubeVirt API in the current namespace as VMDefinition objects using do_paginated_request.
|
|
||||||
"""
|
|
||||||
yield from (
|
|
||||||
types.VMDefinition.from_dict(vm)
|
|
||||||
for vm in self.do_paginated_request(
|
|
||||||
'GET',
|
|
||||||
f'apis/kubevirt.io/v1/namespaces/{self.namespace}/virtualmachines',
|
|
||||||
'items'
|
|
||||||
)
|
|
||||||
)
|
|
||||||
|
|
||||||
@cached('vms', consts.CACHE_INFO_DURATION)
|
|
||||||
def list_vms(self) -> list[types.VMDefinition]:
|
|
||||||
"""
|
|
||||||
List all VMs in the current namespace as VMDefinition objects.
|
|
||||||
"""
|
|
||||||
return list(self.enumerate_vms())
|
|
||||||
|
|
||||||
@cached('instance', consts.CACHE_VM_INFO_DURATION)
|
|
||||||
def get_instance_info(self, vm_name: str, force: bool = False) -> dict | None:
|
|
||||||
"""
|
|
||||||
Get a specific VM by name in the current namespace.
|
|
||||||
Returns the VM dict if found, else None.
|
|
||||||
"""
|
|
||||||
return self.get_vm_by_name(vm_name)
|
|
||||||
|
|
||||||
def start_instance(self, vm_name: str) -> bool:
|
|
||||||
"""
|
|
||||||
Start a specific VM by name.
|
|
||||||
Returns True if started successfully, False otherwise.
|
|
||||||
"""
|
|
||||||
return self.start_vm(self.api_url, self.namespace, vm_name)
|
|
||||||
|
|
||||||
def stop_instance(self, vm_name: str) -> bool:
|
|
||||||
"""
|
|
||||||
Stop a specific VM by name.
|
|
||||||
Returns True if stopped successfully, False otherwise.
|
|
||||||
"""
|
|
||||||
return self.stop_vm(self.api_url, self.namespace, vm_name)
|
|
||||||
|
|
||||||
def clone_instance(self, source_vm_name: str, new_vm_name: str, storage_class: str) -> bool:
|
|
||||||
"""
|
|
||||||
Clone a VM by name, creating a new VM with the same size.
|
|
||||||
Returns True if clone succeeded, False otherwise.
|
|
||||||
"""
|
|
||||||
try:
|
|
||||||
self.copy_vm_same_size(self.api_url, self.namespace, source_vm_name, new_vm_name, storage_class)
|
|
||||||
return True
|
|
||||||
except Exception as e:
|
|
||||||
logging.error(f"Error cloning VM: {e}")
|
|
||||||
return False
|
|
||||||
|
|
||||||
@staticmethod
|
|
||||||
def validate_vm_id(vm_id: str | int) -> None:
|
|
||||||
try:
|
|
||||||
int(vm_id)
|
|
||||||
except ValueError:
|
|
||||||
raise exceptions.OpenshiftNotFoundError(f'VM {vm_id} not found')
|
|
||||||
|
|
||||||
@@ -9,4 +9,4 @@ Author: Adolfo Gómez, dkmaster at dkmon dot com
|
|||||||
# pyright: reportUnusedImport=false
|
# pyright: reportUnusedImport=false
|
||||||
from uds.core import managers
|
from uds.core import managers
|
||||||
|
|
||||||
from .provider import OpenshiftProvider
|
from .provider import MorpheusProvider
|
||||||
@@ -101,20 +101,29 @@ class OpenshiftUserService(DynamicUserService, autoserializable.AutoSerializable
|
|||||||
|
|
||||||
def op_create_checker(self) -> types.states.TaskState:
|
def op_create_checker(self) -> types.states.TaskState:
|
||||||
"""
|
"""
|
||||||
Checks the state of a deploy for a user or cache, robust and delegando a OpenshiftClient.
|
Checks the state of a deploy for an user or cache
|
||||||
"""
|
"""
|
||||||
|
# Wait until we have created the vm is _name is not emtpy
|
||||||
if self._waiting_name:
|
if self._waiting_name:
|
||||||
# Buscar la VM clonada por nombre usando enumerate_instances
|
found_vms = self.service().api.list_instances(name=self._vm_name(), force=True) # Do not use cache
|
||||||
vms = self.service().api.enumerate_instances()
|
if not found_vms:
|
||||||
found = [vm for vm in vms if vm.get('metadata', {}).get('name') == self._vm_name()]
|
|
||||||
if not found:
|
|
||||||
return types.states.TaskState.RUNNING
|
return types.states.TaskState.RUNNING
|
||||||
self._vmid = found[0].get('metadata', {}).get('uid', '')
|
else: # We have the instance, clear _waiting_name and set _vmid
|
||||||
self._waiting_name = False
|
self._vmid = str(found_vms[0].id)
|
||||||
|
self._waiting_name = False
|
||||||
|
|
||||||
instance = self.service().api.get_instance_info(self._vmid)
|
instance = self.service().api.get_instance_info(self._vmid)
|
||||||
if not instance.interfaces or getattr(instance.interfaces[0], 'mac_address', '') == '':
|
|
||||||
|
# If the instace has a mac address, consider it as created and
|
||||||
|
# let Openshift to finish the provisioning as it needs to do
|
||||||
|
if not instance.interfaces or instance.interfaces[0].mac_address == '':
|
||||||
|
# No interfaces, so we cannot use it yet
|
||||||
return types.states.TaskState.RUNNING
|
return types.states.TaskState.RUNNING
|
||||||
|
|
||||||
|
# if not instance.status.is_provisioning():
|
||||||
|
# # If instance is running, we can finish the publication
|
||||||
|
# return types.states.TaskState.FINISHED
|
||||||
|
|
||||||
return types.states.TaskState.FINISHED
|
return types.states.TaskState.FINISHED
|
||||||
|
|
||||||
# In fact, we probably don't need to check task status, but this way we can include the error
|
# In fact, we probably don't need to check task status, but this way we can include the error
|
||||||
@@ -35,7 +35,7 @@ from uds.core import types
|
|||||||
from uds.core.services.generics.fixed.userservice import FixedUserService
|
from uds.core.services.generics.fixed.userservice import FixedUserService
|
||||||
from uds.core.util import autoserializable
|
from uds.core.util import autoserializable
|
||||||
|
|
||||||
from .openshift import types as morph_types
|
from .openshift import types as openshift_types
|
||||||
|
|
||||||
# Not imported at runtime, just for type checking
|
# Not imported at runtime, just for type checking
|
||||||
if typing.TYPE_CHECKING:
|
if typing.TYPE_CHECKING:
|
||||||
@@ -82,7 +82,7 @@ class OpenshiftUserServiceFixed(FixedUserService, autoserializable.AutoSerializa
|
|||||||
self.service().api.stop_instance(self._vmid)
|
self.service().api.stop_instance(self._vmid)
|
||||||
|
|
||||||
# Check methods
|
# Check methods
|
||||||
def _check_status(self, *status: morph_types.InstanceStatus) -> types.states.TaskState:
|
def _check_status(self, *status: openshift_types.InstanceStatus) -> types.states.TaskState:
|
||||||
"""
|
"""
|
||||||
Checks the status of the instance and returns the appropriate TaskState.
|
Checks the status of the instance and returns the appropriate TaskState.
|
||||||
"""
|
"""
|
||||||
@@ -101,8 +101,8 @@ class OpenshiftUserServiceFixed(FixedUserService, autoserializable.AutoSerializa
|
|||||||
Checks if machine has started
|
Checks if machine has started
|
||||||
"""
|
"""
|
||||||
return self._check_status(
|
return self._check_status(
|
||||||
morph_types.InstanceStatus.RUNNING,
|
openshift_types.InstanceStatus.RUNNING,
|
||||||
morph_types.InstanceStatus.PROVISIONING,
|
openshift_types.InstanceStatus.PROVISIONING,
|
||||||
)
|
)
|
||||||
|
|
||||||
def op_stop_checker(self) -> types.states.TaskState:
|
def op_stop_checker(self) -> types.states.TaskState:
|
||||||
@@ -110,6 +110,6 @@ class OpenshiftUserServiceFixed(FixedUserService, autoserializable.AutoSerializa
|
|||||||
Checks if machine has stoped
|
Checks if machine has stoped
|
||||||
"""
|
"""
|
||||||
return self._check_status(
|
return self._check_status(
|
||||||
morph_types.InstanceStatus.STOPPED,
|
openshift_types.InstanceStatus.STOPPED,
|
||||||
morph_types.InstanceStatus.SUSPENDED,
|
openshift_types.InstanceStatus.SUSPENDED,
|
||||||
)
|
)
|
||||||
511
server/src/uds/services/Openshift/openshift/client.py
Normal file
511
server/src/uds/services/Openshift/openshift/client.py
Normal file
@@ -0,0 +1,511 @@
|
|||||||
|
#
|
||||||
|
# Copyright (c) 2025 Virtual Cable S.L.U.
|
||||||
|
# All rights reserved.
|
||||||
|
#
|
||||||
|
# Redistribution and use in source and binary forms, with or without modification,
|
||||||
|
# are permitted provided that the following conditions are met:
|
||||||
|
#
|
||||||
|
# * Redistributions of source code must retain the above copyright notice,
|
||||||
|
# this list of conditions and the following disclaimer.
|
||||||
|
# * Redistributions in binary form must reproduce the above copyright notice,
|
||||||
|
# this list of conditions and the following disclaimer in the documentation
|
||||||
|
# and/or other materials provided with the distribution.
|
||||||
|
# * Neither the name of Virtual Cable S.L.U. nor the names of its contributors
|
||||||
|
# may be used to endorse or promote products derived from this software
|
||||||
|
# without specific prior written permission.
|
||||||
|
#
|
||||||
|
# THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS IS"
|
||||||
|
# AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE
|
||||||
|
# IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE ARE
|
||||||
|
# DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT HOLDER OR CONTRIBUTORS BE LIABLE
|
||||||
|
# FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR CONSEQUENTIAL
|
||||||
|
# DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR
|
||||||
|
# SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER
|
||||||
|
# CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY,
|
||||||
|
# OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE
|
||||||
|
# OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
|
||||||
|
"""
|
||||||
|
Author: Adolfo Gómez, dkmaster at dkmon dot com
|
||||||
|
"""
|
||||||
|
import collections.abc
|
||||||
|
import typing
|
||||||
|
import datetime
|
||||||
|
import urllib.parse
|
||||||
|
import logging
|
||||||
|
|
||||||
|
|
||||||
|
from uds.core.util import security
|
||||||
|
from uds.core.util.cache import Cache
|
||||||
|
from uds.core.util.decorators import cached
|
||||||
|
from uds.core.util.model import sql_now
|
||||||
|
|
||||||
|
from . import types, consts, exceptions
|
||||||
|
|
||||||
|
|
||||||
|
import requests
|
||||||
|
|
||||||
|
logger = logging.getLogger(__name__)
|
||||||
|
|
||||||
|
|
||||||
|
# caching helper
|
||||||
|
def caching_key_helper(obj: 'OpenshiftClient') -> str:
|
||||||
|
return obj._host # pylint: disable=protected-access
|
||||||
|
|
||||||
|
|
||||||
|
class OpenshiftClient:
|
||||||
|
_host: str
|
||||||
|
_port: int
|
||||||
|
_username: str
|
||||||
|
_password: str
|
||||||
|
_verify_ssl: bool
|
||||||
|
_timeout: int
|
||||||
|
_url: str
|
||||||
|
_access_token: str
|
||||||
|
_token_expiry: datetime.datetime
|
||||||
|
|
||||||
|
_session: typing.Optional[requests.Session] = None
|
||||||
|
|
||||||
|
cache: typing.Optional['Cache']
|
||||||
|
|
||||||
|
def __init__(
|
||||||
|
self,
|
||||||
|
host: str,
|
||||||
|
port: int,
|
||||||
|
username: str,
|
||||||
|
password: str,
|
||||||
|
timeout: int = 5,
|
||||||
|
verify_ssl: bool = False,
|
||||||
|
cache: typing.Optional['Cache'] = None,
|
||||||
|
client_id: str | None = None,
|
||||||
|
) -> None:
|
||||||
|
self._host = host
|
||||||
|
self._port = port
|
||||||
|
self._username = username
|
||||||
|
self._password = password
|
||||||
|
|
||||||
|
self._verify_ssl = verify_ssl
|
||||||
|
self._timeout = timeout
|
||||||
|
self._client_id = client_id or 'morph-api'
|
||||||
|
|
||||||
|
self.cache = cache
|
||||||
|
|
||||||
|
self._access_token = self._refresh_token = ''
|
||||||
|
self._token_expiry = datetime.datetime.min
|
||||||
|
|
||||||
|
self._url = f'https://{host}' + (f':{port}' if port != 443 else '') + '/'
|
||||||
|
|
||||||
|
@property
|
||||||
|
def session(self) -> requests.Session:
|
||||||
|
return self.connect()
|
||||||
|
|
||||||
|
def connect(self, force: bool = False) -> requests.Session:
|
||||||
|
now = sql_now()
|
||||||
|
if self._access_token and self._session and self._token_expiry > now and not force:
|
||||||
|
return self._session
|
||||||
|
|
||||||
|
session = self._session = security.secure_requests_session(verify=self._verify_ssl)
|
||||||
|
self._session.headers.update(
|
||||||
|
{
|
||||||
|
'Accept': 'application/json',
|
||||||
|
'Content-Type': 'application/json',
|
||||||
|
}
|
||||||
|
)
|
||||||
|
|
||||||
|
def set_auth_header() -> requests.Session:
|
||||||
|
"""Set the Authorization header with the given token."""
|
||||||
|
session.headers.update({'Authorization': f'Bearer {self._access_token}'})
|
||||||
|
return session
|
||||||
|
|
||||||
|
# If we have an access token, use it as bearer token
|
||||||
|
# If the token is expired, we will refresh it
|
||||||
|
# If force is True, we will refresh the token even if it is not expired
|
||||||
|
if self._access_token and self._token_expiry > now and not force:
|
||||||
|
return set_auth_header()
|
||||||
|
|
||||||
|
try:
|
||||||
|
result = session.post(
|
||||||
|
# ?client_id=morph-api&grant_type=password&scope=write
|
||||||
|
url=self.get_api_url(
|
||||||
|
'oauth/token',
|
||||||
|
('client_id', self._client_id),
|
||||||
|
('grant_type', 'password'),
|
||||||
|
('scope', 'write'),
|
||||||
|
),
|
||||||
|
data={
|
||||||
|
'username': self._username,
|
||||||
|
'password': self._password,
|
||||||
|
},
|
||||||
|
headers={
|
||||||
|
'Accept': 'application/json',
|
||||||
|
'Content-Type': 'application/x-www-form-urlencoded',
|
||||||
|
},
|
||||||
|
timeout=self._timeout,
|
||||||
|
)
|
||||||
|
if not result.ok:
|
||||||
|
raise exceptions.OpenshiftAuthError(result.content.decode('utf8'))
|
||||||
|
data = result.json()
|
||||||
|
self._access_token = data['access_token']
|
||||||
|
# self._refresh_token = data['refresh_token'] # Not used, but could be used to refresh the token
|
||||||
|
self._token_expiry = datetime.datetime.now() + datetime.timedelta(
|
||||||
|
seconds=data.get('expires_in', 3600)
|
||||||
|
) # Default to 1 hour if not provided
|
||||||
|
return set_auth_header()
|
||||||
|
except requests.RequestException as e:
|
||||||
|
raise exceptions.OpenshiftConnectionError(str(e)) from e
|
||||||
|
|
||||||
|
def get_api_url(self, path: str, *parameters: tuple[str, str]) -> str:
|
||||||
|
url = self._url + path
|
||||||
|
if parameters:
|
||||||
|
url += '?' + urllib.parse.urlencode(
|
||||||
|
parameters, doseq=True, safe='[]', quote_via=urllib.parse.quote_plus
|
||||||
|
)
|
||||||
|
return url
|
||||||
|
|
||||||
|
def do_request(
|
||||||
|
self,
|
||||||
|
method: typing.Literal['GET', 'POST', 'PUT', 'DELETE'],
|
||||||
|
path: str,
|
||||||
|
*parameters: tuple[str, str],
|
||||||
|
data: typing.Any = None,
|
||||||
|
check_for_success: bool = False,
|
||||||
|
) -> typing.Any:
|
||||||
|
logger.debug(
|
||||||
|
'Requesting %s %s with parameters %s and data %s',
|
||||||
|
method.upper(),
|
||||||
|
path,
|
||||||
|
parameters,
|
||||||
|
data,
|
||||||
|
)
|
||||||
|
try:
|
||||||
|
match method:
|
||||||
|
case 'GET':
|
||||||
|
response = self.session.get(
|
||||||
|
self.get_api_url(path, *parameters),
|
||||||
|
timeout=self._timeout,
|
||||||
|
)
|
||||||
|
case 'POST':
|
||||||
|
response = self.session.post(
|
||||||
|
self.get_api_url(path, *parameters),
|
||||||
|
json=data,
|
||||||
|
timeout=self._timeout,
|
||||||
|
)
|
||||||
|
case 'PUT':
|
||||||
|
response = self.session.put(
|
||||||
|
self.get_api_url(path, *parameters),
|
||||||
|
json=data,
|
||||||
|
timeout=self._timeout,
|
||||||
|
)
|
||||||
|
case 'DELETE':
|
||||||
|
response = self.session.delete(
|
||||||
|
self.get_api_url(path, *parameters),
|
||||||
|
timeout=self._timeout,
|
||||||
|
)
|
||||||
|
case _:
|
||||||
|
raise ValueError(f'Unsupported HTTP method: {method}')
|
||||||
|
except requests.ConnectionError as e:
|
||||||
|
raise exceptions.OpenshiftConnectionError(str(e))
|
||||||
|
except requests.RequestException as e:
|
||||||
|
raise exceptions.OpenshiftError(f'Error during request: {str(e)}')
|
||||||
|
logger.debug('Request result to %s: %s -- %s', path, response.status_code, response.content[:64])
|
||||||
|
|
||||||
|
if not response.ok:
|
||||||
|
if response.status_code == 401:
|
||||||
|
# Unauthorized, try to refresh the token
|
||||||
|
logger.debug('Unauthorized request, refreshing token')
|
||||||
|
self._session = None
|
||||||
|
raise exceptions.OpenshiftAuthError(
|
||||||
|
'Unauthorized request, please check your credentials or token expiry'
|
||||||
|
)
|
||||||
|
elif response.status_code == 403:
|
||||||
|
# Forbidden, user does not have permissions
|
||||||
|
logger.debug('Forbidden request, check your permissions')
|
||||||
|
raise exceptions.OpenshiftPermissionError('Forbidden request, please check your permissions')
|
||||||
|
elif response.status_code == 404:
|
||||||
|
# Not found, resource does not exist
|
||||||
|
logger.debug('Resource not found: %s', path)
|
||||||
|
raise exceptions.OpenshiftNotFoundError(f'Resource not found: {path}')
|
||||||
|
|
||||||
|
error_message = f'Error on request {method.upper()} {path}: {response.status_code} - {response.content.decode("utf8")[:128]}'
|
||||||
|
logger.debug(error_message)
|
||||||
|
raise exceptions.OpenshiftError(error_message)
|
||||||
|
|
||||||
|
try:
|
||||||
|
data = response.json()
|
||||||
|
except Exception as e:
|
||||||
|
error_message = f'Error parsing JSON response from {method.upper()} {path}: {str(e)}'
|
||||||
|
logger.debug(error_message)
|
||||||
|
raise exceptions.OpenshiftError(error_message)
|
||||||
|
|
||||||
|
if check_for_success and not data.get('success', False):
|
||||||
|
error_message = f'Error on request {method.upper()} {path}: {data.get("error", "Unknown error")}'
|
||||||
|
logger.debug(error_message)
|
||||||
|
raise exceptions.OpenshiftError(error_message)
|
||||||
|
|
||||||
|
return data
|
||||||
|
|
||||||
|
def do_paginated_request(
|
||||||
|
self,
|
||||||
|
method: typing.Literal['GET', 'POST', 'PUT', 'DELETE'],
|
||||||
|
path: str,
|
||||||
|
key: str,
|
||||||
|
*parameters: tuple[str, str],
|
||||||
|
data: typing.Any = None,
|
||||||
|
) -> typing.Iterator[typing.Any]:
|
||||||
|
"""
|
||||||
|
Make a paginated request to the Openshift API.
|
||||||
|
Args:
|
||||||
|
method (str): HTTP method to use (GET, POST, PUT, DELETE)
|
||||||
|
path (str): API endpoint path
|
||||||
|
*parameters: Additional parameters to include in the request
|
||||||
|
data (Any): Data to send with the request (for POST/PUT)
|
||||||
|
Yields:
|
||||||
|
typing.Any: The JSON response from each page of the request
|
||||||
|
|
||||||
|
Note:
|
||||||
|
The responses has also the "meta" key, which contains pagination information:
|
||||||
|
offset: int64
|
||||||
|
max: int64
|
||||||
|
size: int64
|
||||||
|
total: int64
|
||||||
|
|
||||||
|
This information is used to determine if there are more pages to fetch.
|
||||||
|
If not present, we try our best by counting the number of items returned
|
||||||
|
and comparing it with the items requested per page (consts.MAX_ITEMS_PER_REQUEST).
|
||||||
|
"""
|
||||||
|
offset = 0
|
||||||
|
while True:
|
||||||
|
params: list[tuple[str, str]] = [i for i in parameters] + [
|
||||||
|
('max', str(consts.MAX_ITEMS_PER_REQUEST)),
|
||||||
|
('offset', str(offset)),
|
||||||
|
]
|
||||||
|
response = self.do_request(method, path, *params, data=data)
|
||||||
|
data = response.get(key, [])
|
||||||
|
yield from data
|
||||||
|
|
||||||
|
# Checke meta information to see if we have more pages
|
||||||
|
meta = response.get('meta', {})
|
||||||
|
if not meta: # Do our best to avoid errors if meta is not present
|
||||||
|
# Check if we have more pages
|
||||||
|
if len(data) < consts.MAX_ITEMS_PER_REQUEST:
|
||||||
|
break
|
||||||
|
elif meta.get('offset', 0) + meta.get('size', 0) >= meta.get('total', 0):
|
||||||
|
# No more pages, as offset is greater than or equal to total
|
||||||
|
break
|
||||||
|
|
||||||
|
offset += consts.MAX_ITEMS_PER_REQUEST
|
||||||
|
|
||||||
|
@cached('test', consts.CACHE_VM_INFO_DURATION, key_helper=caching_key_helper)
|
||||||
|
def test(self) -> bool:
|
||||||
|
try:
|
||||||
|
self.connect(force=True)
|
||||||
|
except Exception:
|
||||||
|
# logger.error('Error testing Openshift: %s', e)
|
||||||
|
return False
|
||||||
|
return True
|
||||||
|
|
||||||
|
# Not cacheable, as it is a generator
|
||||||
|
def enumerate_instances(
|
||||||
|
self, *, name: str | None = None, detailed: bool = False, show_deleted: bool = False
|
||||||
|
) -> collections.abc.Iterator[types.InstanceInfo]:
|
||||||
|
"""
|
||||||
|
Get all instances from Openshift
|
||||||
|
|
||||||
|
Args:
|
||||||
|
name (str|None): If provided, filter instances by name (case-insensitive?)
|
||||||
|
detailed (bool): If True, return detailed information about instances
|
||||||
|
|
||||||
|
Yields:
|
||||||
|
types.Instance: An instance object with the details of each instance
|
||||||
|
|
||||||
|
Raises:
|
||||||
|
OpenshiftError: If there is an error getting the instances
|
||||||
|
"""
|
||||||
|
params = [
|
||||||
|
('showDeleted', str(show_deleted).lower()),
|
||||||
|
('details', str(detailed).lower()),
|
||||||
|
]
|
||||||
|
if name:
|
||||||
|
params.append(('name', name))
|
||||||
|
yield from (
|
||||||
|
types.InstanceInfo.from_dict(instance)
|
||||||
|
for instance in self.do_paginated_request('GET', 'api/instances', 'instances', *params)
|
||||||
|
)
|
||||||
|
|
||||||
|
@cached('instances', consts.CACHE_INFO_DURATION, key_helper=caching_key_helper)
|
||||||
|
def list_instances(
|
||||||
|
self,
|
||||||
|
*,
|
||||||
|
name: str | None = None,
|
||||||
|
detailed: bool = False,
|
||||||
|
show_deleted: bool = False,
|
||||||
|
force: bool = False,
|
||||||
|
) -> list[types.InstanceInfo]:
|
||||||
|
return list(self.enumerate_instances(name=name, detailed=detailed, show_deleted=show_deleted))
|
||||||
|
|
||||||
|
@cached('instance', consts.CACHE_VM_INFO_DURATION, key_helper=caching_key_helper)
|
||||||
|
def get_instance_info(self, instance_id: str | int, force: bool = False) -> types.InstanceInfo:
|
||||||
|
"""
|
||||||
|
Get a specific instance by ID
|
||||||
|
"""
|
||||||
|
OpenshiftClient.validate_instance_id(instance_id)
|
||||||
|
|
||||||
|
response = self.do_request(
|
||||||
|
'GET',
|
||||||
|
f'api/instances/{instance_id}',
|
||||||
|
('details', 'true'),
|
||||||
|
)
|
||||||
|
return types.InstanceInfo.from_dict(response['instance'])
|
||||||
|
|
||||||
|
def clone_instance(self, instance_id: str | int, name: str, group_id: int | None = None) -> None:
|
||||||
|
"""
|
||||||
|
Clone a specific instance by ID
|
||||||
|
|
||||||
|
TODO: maybe we can change the network interface configuration
|
||||||
|
"networkInterfaces": [{ "network": { "id": "subnet-12542" } }]
|
||||||
|
"""
|
||||||
|
# "https://172.27.0.1/api/instances/{id}/clone"
|
||||||
|
# Params: name, group: {'id': id} (optional)
|
||||||
|
OpenshiftClient.validate_instance_id(instance_id)
|
||||||
|
|
||||||
|
data: dict[str, typing.Any] = {
|
||||||
|
'name': name,
|
||||||
|
}
|
||||||
|
if group_id is not None:
|
||||||
|
data['group'] = {'id': group_id}
|
||||||
|
self.do_request(
|
||||||
|
'PUT',
|
||||||
|
f'api/instances/{instance_id}/clone',
|
||||||
|
data=data,
|
||||||
|
check_for_success=True,
|
||||||
|
)
|
||||||
|
|
||||||
|
def start_instance(self, instance_id: str | int) -> None:
|
||||||
|
"""
|
||||||
|
Start a specific instance by ID
|
||||||
|
|
||||||
|
Args:
|
||||||
|
instance_id (int): The ID of the instance to start
|
||||||
|
|
||||||
|
Raises:
|
||||||
|
OpenshiftError: If there is an error starting the instance
|
||||||
|
"""
|
||||||
|
OpenshiftClient.validate_instance_id(instance_id)
|
||||||
|
|
||||||
|
self.do_request(
|
||||||
|
'PUT',
|
||||||
|
f'api/instances/{instance_id}/start',
|
||||||
|
check_for_success=True,
|
||||||
|
)
|
||||||
|
|
||||||
|
def stop_instance(self, instance_id: str | int) -> None:
|
||||||
|
"""
|
||||||
|
Stop a specific instance by ID
|
||||||
|
|
||||||
|
Args:
|
||||||
|
instance_id (int): The ID of the instance to stop
|
||||||
|
force (bool): If True, force stop the instance
|
||||||
|
|
||||||
|
Raises:
|
||||||
|
OpenshiftError: If there is an error stopping the instance
|
||||||
|
"""
|
||||||
|
OpenshiftClient.validate_instance_id(instance_id)
|
||||||
|
|
||||||
|
self.do_request(
|
||||||
|
'PUT',
|
||||||
|
f'api/instances/{instance_id}/stop',
|
||||||
|
check_for_success=True,
|
||||||
|
)
|
||||||
|
|
||||||
|
def restart_instance(self, instance_id: str | int) -> None:
|
||||||
|
"""
|
||||||
|
Restart a specific instance by ID
|
||||||
|
|
||||||
|
Args:
|
||||||
|
instance_id (int): The ID of the instance to restart
|
||||||
|
force (bool): If True, force restart the instance
|
||||||
|
|
||||||
|
Raises:
|
||||||
|
OpenshiftError: If there is an error restarting the instance
|
||||||
|
"""
|
||||||
|
OpenshiftClient.validate_instance_id(instance_id)
|
||||||
|
|
||||||
|
self.do_request(
|
||||||
|
'PUT',
|
||||||
|
f'api/instances/{instance_id}/restart',
|
||||||
|
check_for_success=True,
|
||||||
|
)
|
||||||
|
|
||||||
|
def delete_instance(self, instance_id: str | int, force: bool = False) -> None:
|
||||||
|
"""
|
||||||
|
Delete a specific instance by ID
|
||||||
|
|
||||||
|
Args:
|
||||||
|
instance_id (int): The ID of the instance to delete
|
||||||
|
|
||||||
|
Raises:
|
||||||
|
OpenshiftError: If there is an error deleting the instance
|
||||||
|
"""
|
||||||
|
OpenshiftClient.validate_instance_id(instance_id)
|
||||||
|
params = [('force', 'true')] if force else []
|
||||||
|
|
||||||
|
self.do_request(
|
||||||
|
'DELETE',
|
||||||
|
f'api/instances/{instance_id}',
|
||||||
|
*params,
|
||||||
|
check_for_success=True,
|
||||||
|
)
|
||||||
|
|
||||||
|
def enumerate_groups(self) -> collections.abc.Iterator[types.BasicInfo]:
|
||||||
|
"""
|
||||||
|
Get all groups from Openshift
|
||||||
|
|
||||||
|
Yields:
|
||||||
|
types.IdValuePair: An IdValuePair object with the details of each group
|
||||||
|
"""
|
||||||
|
yield from (
|
||||||
|
types.BasicInfo.from_dict(group)
|
||||||
|
for group in self.do_paginated_request('GET', 'api/groups', 'groups')
|
||||||
|
)
|
||||||
|
|
||||||
|
@cached('groups', consts.CACHE_INFO_DURATION, key_helper=caching_key_helper)
|
||||||
|
def list_groups(self, force: bool = False) -> list[types.BasicInfo]:
|
||||||
|
"""
|
||||||
|
List all groups available in Openshift
|
||||||
|
|
||||||
|
Returns:
|
||||||
|
list[types.IdValuePair]: A list of IdValuePair objects representing the groups
|
||||||
|
"""
|
||||||
|
return list(self.enumerate_groups())
|
||||||
|
|
||||||
|
def enumerate_clouds(self, group_id: int | None = None) -> collections.abc.Iterator[types.BasicInfo]:
|
||||||
|
"""
|
||||||
|
Get all clouds from Openshift
|
||||||
|
Args:
|
||||||
|
group (str|None): If provided, filter clouds by group name
|
||||||
|
Yields:
|
||||||
|
types.BasicInfo: An IdValuePair object with the details of each cloud
|
||||||
|
"""
|
||||||
|
parameters: list[tuple[str, str]] = []
|
||||||
|
if group_id is not None:
|
||||||
|
parameters.append(('groupId', str(group_id)))
|
||||||
|
yield from (
|
||||||
|
types.BasicInfo.from_dict(cloud)
|
||||||
|
for cloud in self.do_paginated_request('GET', 'api/zones', 'zones', *parameters)
|
||||||
|
)
|
||||||
|
|
||||||
|
@cached('clouds', consts.CACHE_INFO_DURATION, key_helper=caching_key_helper)
|
||||||
|
def list_clouds(self, group_id: int | None = None, force: bool = False) -> list[types.BasicInfo]:
|
||||||
|
"""
|
||||||
|
List all clouds available in Openshift
|
||||||
|
|
||||||
|
Returns:
|
||||||
|
list[types.IdValuePair]: A list of IdValuePair objects representing the clouds
|
||||||
|
"""
|
||||||
|
return list(self.enumerate_clouds(group_id=group_id))
|
||||||
|
|
||||||
|
@staticmethod
|
||||||
|
def validate_instance_id(instance_id: str | int) -> None:
|
||||||
|
try:
|
||||||
|
int(instance_id)
|
||||||
|
except ValueError:
|
||||||
|
raise exceptions.OpenshiftNotFoundError(f'Instance {instance_id} not found')
|
||||||
@@ -1,11 +1,8 @@
|
|||||||
import collections.abc
|
|
||||||
import dataclasses
|
import dataclasses
|
||||||
import enum
|
import enum
|
||||||
import typing
|
import typing
|
||||||
import logging
|
import logging
|
||||||
|
|
||||||
from typing import TypedDict, Any
|
|
||||||
|
|
||||||
from . import exceptions
|
from . import exceptions
|
||||||
|
|
||||||
|
|
||||||
@@ -236,193 +233,3 @@ class InstanceInfo:
|
|||||||
except Exception as e: # Any exception during parsing will generate a "null"
|
except Exception as e: # Any exception during parsing will generate a "null"
|
||||||
logger.error(f'Error creating Instance from dict: {e}')
|
logger.error(f'Error creating Instance from dict: {e}')
|
||||||
return InstanceInfo.null()
|
return InstanceInfo.null()
|
||||||
|
|
||||||
|
|
||||||
#* --- OpenShift resource TypedDicts ---
|
|
||||||
|
|
||||||
@dataclasses.dataclass
|
|
||||||
class VMMetadata:
|
|
||||||
name: str
|
|
||||||
namespace: str
|
|
||||||
uid: str
|
|
||||||
|
|
||||||
@staticmethod
|
|
||||||
def from_dict(dictionary: collections.abc.MutableMapping[str, typing.Any]) -> 'VMMetadata':
|
|
||||||
return VMMetadata(
|
|
||||||
name=dictionary.get('name', ''),
|
|
||||||
namespace=dictionary.get('namespace', ''),
|
|
||||||
uid=dictionary.get('uid', ''),
|
|
||||||
)
|
|
||||||
|
|
||||||
@dataclasses.dataclass
|
|
||||||
class VMVolumeTemplate:
|
|
||||||
name: str
|
|
||||||
storage: str
|
|
||||||
storage_class: str
|
|
||||||
|
|
||||||
@staticmethod
|
|
||||||
def from_dict(dictionary: collections.abc.MutableMapping[str, typing.Any]) -> 'VMVolumeTemplate':
|
|
||||||
meta = dictionary.get('metadata', {})
|
|
||||||
spec = dictionary.get('spec', {})
|
|
||||||
storage = spec.get('storage', {})
|
|
||||||
resources = storage.get('resources', {})
|
|
||||||
requests = resources.get('requests', {})
|
|
||||||
return VMVolumeTemplate(
|
|
||||||
name=meta.get('name', ''),
|
|
||||||
storage=requests.get('storage', ''),
|
|
||||||
storage_class=storage.get('storageClassName', ''),
|
|
||||||
)
|
|
||||||
|
|
||||||
@dataclasses.dataclass
|
|
||||||
class VMInterface:
|
|
||||||
name: str
|
|
||||||
model: str
|
|
||||||
mac_address: str
|
|
||||||
state: str
|
|
||||||
|
|
||||||
@staticmethod
|
|
||||||
def from_dict(dictionary: collections.abc.MutableMapping[str, typing.Any]) -> 'VMInterface':
|
|
||||||
return VMInterface(
|
|
||||||
name=dictionary.get('name', ''),
|
|
||||||
model=dictionary.get('model', ''),
|
|
||||||
mac_address=dictionary.get('macAddress', ''),
|
|
||||||
state=dictionary.get('state', ''),
|
|
||||||
)
|
|
||||||
|
|
||||||
@dataclasses.dataclass
|
|
||||||
class VMDeviceDisk:
|
|
||||||
name: str
|
|
||||||
boot_order: int
|
|
||||||
|
|
||||||
@staticmethod
|
|
||||||
def from_dict(dictionary: collections.abc.MutableMapping[str, typing.Any]) -> 'VMDeviceDisk':
|
|
||||||
return VMDeviceDisk(
|
|
||||||
name=dictionary.get('name', ''),
|
|
||||||
boot_order=dictionary.get('bootOrder', 0),
|
|
||||||
)
|
|
||||||
|
|
||||||
@dataclasses.dataclass
|
|
||||||
class VMVolume:
|
|
||||||
name: str
|
|
||||||
data_volume: str
|
|
||||||
|
|
||||||
@staticmethod
|
|
||||||
def from_dict(dictionary: collections.abc.MutableMapping[str, typing.Any]) -> 'VMVolume':
|
|
||||||
dv = dictionary.get('dataVolume', {})
|
|
||||||
return VMVolume(
|
|
||||||
name=dictionary.get('name', ''),
|
|
||||||
data_volume=dv.get('name', ''),
|
|
||||||
)
|
|
||||||
|
|
||||||
@dataclasses.dataclass
|
|
||||||
class VMDomain:
|
|
||||||
architecture: str
|
|
||||||
disks: list[VMDeviceDisk]
|
|
||||||
interfaces: list[VMInterface]
|
|
||||||
volumes: list[VMVolume]
|
|
||||||
subdomain: str
|
|
||||||
|
|
||||||
@staticmethod
|
|
||||||
def from_dict(dictionary: collections.abc.MutableMapping[str, typing.Any]) -> 'VMDomain':
|
|
||||||
return VMDomain(
|
|
||||||
architecture=dictionary.get('architecture', ''),
|
|
||||||
disks=[VMDeviceDisk.from_dict(disk) for disk in dictionary.get('domain', {}).get('devices', {}).get('disks', [])],
|
|
||||||
interfaces=[VMInterface.from_dict(iface) for iface in dictionary.get('domain', {}).get('devices', {}).get('interfaces', [])],
|
|
||||||
volumes=[VMVolume.from_dict(vol) for vol in dictionary.get('volumes', [])],
|
|
||||||
subdomain=dictionary.get('subdomain', ''),
|
|
||||||
)
|
|
||||||
|
|
||||||
@dataclasses.dataclass
|
|
||||||
class VMStatus:
|
|
||||||
printable_status: str
|
|
||||||
run_strategy: str
|
|
||||||
|
|
||||||
@staticmethod
|
|
||||||
def from_dict(dictionary: collections.abc.MutableMapping[str, typing.Any]) -> 'VMStatus':
|
|
||||||
return VMStatus(
|
|
||||||
printable_status=dictionary.get('printableStatus', ''),
|
|
||||||
run_strategy=dictionary.get('runStrategy', ''),
|
|
||||||
)
|
|
||||||
|
|
||||||
@dataclasses.dataclass
|
|
||||||
class VMDefinition:
|
|
||||||
metadata: VMMetadata
|
|
||||||
run_strategy: str
|
|
||||||
status: VMStatus
|
|
||||||
volume_template: VMVolumeTemplate
|
|
||||||
domain: VMDomain
|
|
||||||
|
|
||||||
@staticmethod
|
|
||||||
def from_dict(dictionary: collections.abc.MutableMapping[str, typing.Any]) -> 'VMDefinition':
|
|
||||||
spec = dictionary.get('spec', {})
|
|
||||||
template = spec.get('template', {}).get('spec', {})
|
|
||||||
return VMDefinition(
|
|
||||||
metadata=VMMetadata.from_dict(dictionary.get('metadata', {})),
|
|
||||||
run_strategy=spec.get('runStrategy', ''),
|
|
||||||
status=VMStatus.from_dict(dictionary.get('status', {})),
|
|
||||||
volume_template=VMVolumeTemplate.from_dict(spec.get('dataVolumeTemplates', [{}])[0]),
|
|
||||||
domain=VMDomain.from_dict(template),
|
|
||||||
)
|
|
||||||
|
|
||||||
|
|
||||||
class ManagedFieldType(TypedDict, total=False):
|
|
||||||
apiVersion: str
|
|
||||||
fieldsType: str
|
|
||||||
fieldsV1: dict[str, Any]
|
|
||||||
manager: str
|
|
||||||
operation: str
|
|
||||||
time: str
|
|
||||||
subresource: str
|
|
||||||
|
|
||||||
class MetadataType(TypedDict, total=False):
|
|
||||||
name: str
|
|
||||||
namespace: str
|
|
||||||
uid: str
|
|
||||||
resourceVersion: str
|
|
||||||
creationTimestamp: str
|
|
||||||
annotations: dict[str, Any]
|
|
||||||
labels: dict[str, Any]
|
|
||||||
managedFields: list[ManagedFieldType]
|
|
||||||
finalizers: list[str]
|
|
||||||
generation: int
|
|
||||||
|
|
||||||
class VMItemType(TypedDict, total=False):
|
|
||||||
apiVersion: str
|
|
||||||
kind: str
|
|
||||||
metadata: MetadataType
|
|
||||||
spec: dict[str, Any]
|
|
||||||
status: dict[str, Any]
|
|
||||||
|
|
||||||
class VMListType(TypedDict, total=False):
|
|
||||||
apiVersion: str
|
|
||||||
kind: str
|
|
||||||
items: list[VMItemType]
|
|
||||||
metadata: dict[str, Any]
|
|
||||||
|
|
||||||
class NADSpecType(TypedDict, total=False):
|
|
||||||
config: str
|
|
||||||
|
|
||||||
class NADItemType(TypedDict, total=False):
|
|
||||||
apiVersion: str
|
|
||||||
kind: str
|
|
||||||
metadata: MetadataType
|
|
||||||
spec: NADSpecType
|
|
||||||
|
|
||||||
class NADListType(TypedDict, total=False):
|
|
||||||
apiVersion: str
|
|
||||||
kind: str
|
|
||||||
items: list[NADItemType]
|
|
||||||
metadata: dict[str, Any]
|
|
||||||
|
|
||||||
class SCItemType(TypedDict, total=False):
|
|
||||||
metadata: MetadataType
|
|
||||||
provisioner: str
|
|
||||||
parameters: dict[str, Any]
|
|
||||||
reclaimPolicy: str
|
|
||||||
volumeBindingMode: str
|
|
||||||
|
|
||||||
class SCListType(TypedDict, total=False):
|
|
||||||
apiVersion: str
|
|
||||||
kind: str
|
|
||||||
items: list[SCItemType]
|
|
||||||
metadata: dict[str, Any]
|
|
||||||
|
Before Width: | Height: | Size: 2.8 KiB After Width: | Height: | Size: 2.8 KiB |
@@ -15,7 +15,7 @@ from django.utils.translation import gettext_noop as _
|
|||||||
from uds.core import types as core_types, consts
|
from uds.core import types as core_types, consts
|
||||||
from uds.core.services import ServiceProvider
|
from uds.core.services import ServiceProvider
|
||||||
from uds.core.ui import gui
|
from uds.core.ui import gui
|
||||||
from uds.core.util import fields
|
from uds.core.util import validators, fields
|
||||||
from uds.core.util.decorators import cached
|
from uds.core.util.decorators import cached
|
||||||
|
|
||||||
from .openshift import client
|
from .openshift import client
|
||||||
@@ -38,66 +38,59 @@ class OpenshiftProvider(ServiceProvider):
|
|||||||
icon_file = 'provider.png'
|
icon_file = 'provider.png'
|
||||||
|
|
||||||
# Gui
|
# Gui
|
||||||
cluster_url = gui.TextField(
|
host = gui.TextField(
|
||||||
order=1,
|
order=1,
|
||||||
length=128,
|
|
||||||
label=_('Cluster OAuth URL'),
|
|
||||||
tooltip=_('Openshift OAuth URL, e.g. https://oauth-openshift.apps-crc.testing'),
|
|
||||||
required=True,
|
|
||||||
default='https://oauth-openshift.apps-crc.testing',
|
|
||||||
)
|
|
||||||
api_url = gui.TextField(
|
|
||||||
order=2,
|
|
||||||
length=128,
|
|
||||||
label=_('API URL'),
|
|
||||||
tooltip=_('Openshift API URL, e.g. https://localhost:6443'),
|
|
||||||
required=True,
|
|
||||||
default='https://localhost:6443',
|
|
||||||
)
|
|
||||||
username = gui.TextField(
|
|
||||||
order=3,
|
|
||||||
length=64,
|
length=64,
|
||||||
|
label=_('Host'),
|
||||||
|
tooltip=_('Openshift Server IP or Hostname'),
|
||||||
|
required=True,
|
||||||
|
)
|
||||||
|
port = gui.NumericField(
|
||||||
|
order=2,
|
||||||
|
length=3,
|
||||||
|
label=_('Port'),
|
||||||
|
default=443,
|
||||||
|
tooltip=_('Openshift Server Port (default 443)'),
|
||||||
|
required=True,
|
||||||
|
)
|
||||||
|
verify_ssl = fields.verify_ssl_field(order=3)
|
||||||
|
|
||||||
|
username = gui.TextField(
|
||||||
|
order=4,
|
||||||
|
length=32,
|
||||||
label=_('Username'),
|
label=_('Username'),
|
||||||
tooltip=_('User with valid privileges on Openshift Server'),
|
tooltip=_('User with valid privileges on Openshift Server'),
|
||||||
required=True,
|
required=True,
|
||||||
default='kubeadmin',
|
|
||||||
)
|
)
|
||||||
password = gui.PasswordField(
|
password = gui.PasswordField(
|
||||||
order=4,
|
order=5,
|
||||||
length=64,
|
length=32,
|
||||||
label=_('Password'),
|
label=_('Password'),
|
||||||
tooltip=_('Password of the user of Openshift Server'),
|
tooltip=_('Password of the user of Openshift Server'),
|
||||||
required=True,
|
required=True,
|
||||||
default='Tn5u8-9k9I9-6WF3Y-q5hSB',
|
|
||||||
)
|
)
|
||||||
namespace = gui.TextField(
|
|
||||||
order=5,
|
|
||||||
length=64,
|
|
||||||
label=_('Namespace'),
|
|
||||||
tooltip=_('Openshift namespace to use (default: "default")'),
|
|
||||||
required=True,
|
|
||||||
default='default',
|
|
||||||
)
|
|
||||||
verify_ssl = fields.verify_ssl_field(order=6)
|
|
||||||
concurrent_creation_limit = fields.concurrent_creation_limit_field()
|
concurrent_creation_limit = fields.concurrent_creation_limit_field()
|
||||||
concurrent_removal_limit = fields.concurrent_removal_limit_field()
|
concurrent_removal_limit = fields.concurrent_removal_limit_field()
|
||||||
timeout = fields.timeout_field()
|
timeout = fields.timeout_field()
|
||||||
|
|
||||||
_cached_api: typing.Optional['client.OpenshiftClient'] = None
|
_cached_api: typing.Optional['client.OpenshiftClient'] = None
|
||||||
|
|
||||||
|
# There is more fields type, but not here the best place to cover it
|
||||||
def initialize(self, values: 'core_types.core.ValuesType') -> None:
|
def initialize(self, values: 'core_types.core.ValuesType') -> None:
|
||||||
# No port validation needed, URLs are used
|
if values:
|
||||||
pass
|
self.port.value = validators.validate_port(self.port.value)
|
||||||
|
|
||||||
@property
|
@property
|
||||||
def api(self) -> 'client.OpenshiftClient':
|
def api(
|
||||||
|
self,
|
||||||
|
) -> 'client.OpenshiftClient':
|
||||||
if self._cached_api is None:
|
if self._cached_api is None:
|
||||||
self._cached_api = client.OpenshiftClient(
|
self._cached_api = client.OpenshiftClient(
|
||||||
cluster_url=self.cluster_url.value,
|
host=self.host.value,
|
||||||
api_url=self.api_url.value,
|
port=self.port.as_int(),
|
||||||
username=self.username.value,
|
username=self.username.value,
|
||||||
password=self.password.value,
|
password=self.password.value,
|
||||||
namespace=self.namespace.value or 'default',
|
|
||||||
cache=self.cache,
|
cache=self.cache,
|
||||||
timeout=self.timeout.as_int(),
|
timeout=self.timeout.as_int(),
|
||||||
verify_ssl=self.verify_ssl.as_bool(),
|
verify_ssl=self.verify_ssl.as_bool(),
|
||||||
@@ -107,6 +100,23 @@ class OpenshiftProvider(ServiceProvider):
|
|||||||
def test_connection(self) -> bool:
|
def test_connection(self) -> bool:
|
||||||
return self.api.test()
|
return self.api.test()
|
||||||
|
|
||||||
|
# def test_connection(self) -> bool:
|
||||||
|
# return self.api.test()
|
||||||
|
|
||||||
|
# def get_task_info(self, task_id: str) -> nu_types.TaskInfo:
|
||||||
|
# try:
|
||||||
|
# return self.api.get_task_info(task_id)
|
||||||
|
# except nu_exceptions.AcropolisConnectionError:
|
||||||
|
# raise
|
||||||
|
# except Exception as e:
|
||||||
|
# logger.error('Exception obtaining Openshift task info: %s', e)
|
||||||
|
# return nu_types.TaskInfo(
|
||||||
|
# state=nu_types.TaskState.UNKNOWN, error={'error': str(e)}, entities=[]
|
||||||
|
# )
|
||||||
|
|
||||||
|
# def get_macs_range(self) -> str:
|
||||||
|
# return self.macs_range.value
|
||||||
|
|
||||||
@cached('reachable', consts.cache.SHORT_CACHE_TIMEOUT)
|
@cached('reachable', consts.cache.SHORT_CACHE_TIMEOUT)
|
||||||
def is_available(self) -> bool:
|
def is_available(self) -> bool:
|
||||||
return self.api.test()
|
return self.api.test()
|
||||||
@@ -41,7 +41,6 @@ class OpenshiftTemplatePublication(DynamicPublication, autoserializable.AutoSeri
|
|||||||
# We need to wait for the name te be created, but don't want to loose the _name
|
# We need to wait for the name te be created, but don't want to loose the _name
|
||||||
self._waiting_name = True
|
self._waiting_name = True
|
||||||
instance_info = self.service().api.get_instance_info(self.service().template.value)
|
instance_info = self.service().api.get_instance_info(self.service().template.value)
|
||||||
logger.debug('Instance info for cloning: %s', instance_info)
|
|
||||||
if instance_info.status.is_cloning():
|
if instance_info.status.is_cloning():
|
||||||
self.retry_later()
|
self.retry_later()
|
||||||
elif not instance_info.status.is_cloneable():
|
elif not instance_info.status.is_cloneable():
|
||||||
|
Before Width: | Height: | Size: 2.8 KiB After Width: | Height: | Size: 2.8 KiB |
@@ -97,31 +97,14 @@ class OpenshiftService(DynamicService): # pylint: disable=too-many-public-metho
|
|||||||
|
|
||||||
self.basename.value = validators.validate_basename(self.basename.value, length=self.lenname.as_int())
|
self.basename.value = validators.validate_basename(self.basename.value, length=self.lenname.as_int())
|
||||||
|
|
||||||
def init_gui(self) -> None:
|
|
||||||
self.prov_uuid.value = self.provider().get_uuid()
|
|
||||||
|
|
||||||
vm_items = self.api.enumerate_vms()
|
|
||||||
choices = []
|
|
||||||
logger.debug('VMs found: %s', vm_items)
|
|
||||||
for vm in vm_items:
|
|
||||||
name = vm.get('metadata', {}).get('name', 'UNKNOWN')
|
|
||||||
namespace = vm.get('metadata', {}).get('namespace', '')
|
|
||||||
# Exclude templates whose name starts with 'UDS-'
|
|
||||||
if name.upper().startswith('UDS-'):
|
|
||||||
continue
|
|
||||||
# Show namespace if not default
|
|
||||||
label = f"{name} ({namespace})" if namespace and namespace != 'default' else name
|
|
||||||
choices.append(gui.choice_item(name, label))
|
|
||||||
self.template.set_choices(choices)
|
|
||||||
|
|
||||||
def init_gui(self) -> None:
|
def init_gui(self) -> None:
|
||||||
self.prov_uuid.value = self.provider().get_uuid()
|
self.prov_uuid.value = self.provider().get_uuid()
|
||||||
|
|
||||||
self.template.set_choices(
|
self.template.set_choices(
|
||||||
[
|
[
|
||||||
gui.choice_item(str(template.metadata.uid), f'{template.metadata.name} ({template.metadata.namespace})')
|
gui.choice_item(str(template.id), f'{template.name} ({template.tenant.name})')
|
||||||
for template in self.provider().api.list_vms()
|
for template in self.provider().api.list_instances()
|
||||||
if not template.metadata.name.startswith('UDS-')
|
if template.is_usable() and not template.name.startswith('UDS-')
|
||||||
]
|
]
|
||||||
)
|
)
|
||||||
|
|
||||||
@@ -204,7 +187,7 @@ class OpenshiftService(DynamicService): # pylint: disable=too-many-public-metho
|
|||||||
self, caller_instance: typing.Optional['DynamicUserService | DynamicPublication'], vmid: str
|
self, caller_instance: typing.Optional['DynamicUserService | DynamicPublication'], vmid: str
|
||||||
) -> None:
|
) -> None:
|
||||||
"""
|
"""
|
||||||
Shutdowns the machine, same as stop (both tries soft shutdown, it's a openshift thing)
|
Shutdowns the machine, same as stop (both tries soft shutdown, it's a Openshift thing)
|
||||||
"""
|
"""
|
||||||
self.api.stop_instance(vmid)
|
self.api.stop_instance(vmid)
|
||||||
|
|
||||||
@@ -42,7 +42,7 @@ from .deployment_fixed import OpenshiftUserServiceFixed
|
|||||||
|
|
||||||
# Not imported at runtime, just for type checking
|
# Not imported at runtime, just for type checking
|
||||||
if typing.TYPE_CHECKING:
|
if typing.TYPE_CHECKING:
|
||||||
from .openshift import client
|
from .Openshift import client
|
||||||
from .provider import OpenshiftProvider
|
from .provider import OpenshiftProvider
|
||||||
from uds.core.services.generics.fixed.userservice import FixedUserService
|
from uds.core.services.generics.fixed.userservice import FixedUserService
|
||||||
|
|
||||||
@@ -99,18 +99,18 @@ class OpenshiftServiceFixed(FixedService): # pylint: disable=too-many-public-me
|
|||||||
|
|
||||||
# Uses default FixedService.initialize
|
# Uses default FixedService.initialize
|
||||||
|
|
||||||
|
|
||||||
def init_gui(self) -> None:
|
def init_gui(self) -> None:
|
||||||
self.prov_uuid.value = self.provider().get_uuid()
|
self.prov_uuid.value = self.provider().get_uuid()
|
||||||
|
|
||||||
self.machines.set_choices(
|
self.machines.set_choices(
|
||||||
[
|
[
|
||||||
gui.choice_item(str(machine.metadata.uid), f'{machine.metadata.name} ({machine.metadata.namespace})')
|
gui.choice_item(str(template.id), f'{template.name} ({template.tenant.name})')
|
||||||
for machine in self.provider().api.list_vms()
|
for template in self.provider().api.list_instances()
|
||||||
if not machine.metadata.name.startswith('UDS-')
|
if template.is_usable() and not template.name.startswith('UDS-')
|
||||||
]
|
]
|
||||||
)
|
)
|
||||||
|
|
||||||
|
|
||||||
def provider(self) -> 'OpenshiftProvider':
|
def provider(self) -> 'OpenshiftProvider':
|
||||||
return typing.cast('OpenshiftProvider', super().provider())
|
return typing.cast('OpenshiftProvider', super().provider())
|
||||||
|
|
||||||
@@ -119,17 +119,9 @@ class OpenshiftServiceFixed(FixedService): # pylint: disable=too-many-public-me
|
|||||||
|
|
||||||
def enumerate_assignables(self) -> collections.abc.Iterable[types.ui.ChoiceItem]:
|
def enumerate_assignables(self) -> collections.abc.Iterable[types.ui.ChoiceItem]:
|
||||||
# Obtain machines names and ids for asignables
|
# Obtain machines names and ids for asignables
|
||||||
servers = {}
|
servers = {
|
||||||
vm_items = self.api.enumerate_instances()
|
str(server.id): server.name for server in self.api.list_instances() if not server.name.startswith('UDS-') and server.is_usable()
|
||||||
for vm in vm_items:
|
}
|
||||||
name = vm.get('metadata', {}).get('name', 'UNKNOWN')
|
|
||||||
namespace = vm.get('metadata', {}).get('namespace', '')
|
|
||||||
# Exclude templates whose name starts with 'UDS-'
|
|
||||||
if name.startswith('UDS-'):
|
|
||||||
continue
|
|
||||||
# Show namespace if not default
|
|
||||||
label = f"{name} ({namespace})" if namespace and namespace != 'default' else name
|
|
||||||
servers[name] = label
|
|
||||||
|
|
||||||
with self._assigned_access() as assigned_servers:
|
with self._assigned_access() as assigned_servers:
|
||||||
return [
|
return [
|
||||||
@@ -184,7 +176,7 @@ class OpenshiftServiceFixed(FixedService): # pylint: disable=too-many-public-me
|
|||||||
self.do_log(types.log.LogLevel.INFO, f'Stopping machine {vmid} after logout')
|
self.do_log(types.log.LogLevel.INFO, f'Stopping machine {vmid} after logout')
|
||||||
|
|
||||||
def get_mac(self, vmid: str) -> str:
|
def get_mac(self, vmid: str) -> str:
|
||||||
# No mac on openshift instances, return ip instead
|
# No mac on Openshift instances, return ip instead
|
||||||
return self.api.get_instance_info(vmid).validate().interfaces[0].mac_address if vmid else ''
|
return self.api.get_instance_info(vmid).validate().interfaces[0].mac_address if vmid else ''
|
||||||
|
|
||||||
def get_ip(self, vmid: str) -> str:
|
def get_ip(self, vmid: str) -> str:
|
||||||
Reference in New Issue
Block a user