1
0
mirror of https://github.com/dkmstr/openuds.git synced 2025-10-23 23:34:07 +03:00

11 Commits

Author SHA1 Message Date
aschumann-virtualcable
53beb1d0f2 Update subproject commits for actor and client modules 2025-10-22 11:45:34 +02:00
aschumann-virtualcable
1ddbb1120b Merge remote-tracking branch 'origin/v4.0' into dev/andres/v4.0 2025-10-21 16:52:32 +02:00
Adolfo Gómez García
461917e652 Convert transport_grouping and ha_policy to string for consistent data handling 2025-10-21 16:39:48 +02:00
aschumann-virtualcable
33f5c1d34a Merge remote-tracking branch 'origin/master' into dev/andres/v4.0 2025-10-21 10:49:06 +02:00
Adolfo Gómez García
a867ed7b6a Updated actor 2025-10-18 02:17:44 +02:00
aschumann-virtualcable
67bbce579f Add Openshift service provider and related functionalities
- Implemented OpenshiftProvider class to manage Openshift instances.
- Created OpenshiftService and OpenshiftServiceFixed classes for dynamic and fixed services respectively.
- Added OpenshiftTemplatePublication for managing instance publications.
- Introduced GUI fields for configuration such as host, port, username, and password.
- Implemented connection testing and API interaction methods.
- Added image assets for service and provider icons.
- Included error handling and logging for better traceability.
2025-10-16 17:32:04 +02:00
aschumann-virtualcable
cdd5f3c1d1 Merge remote-tracking branch 'origin/v4.0' into dev/andres/v4.0 2025-10-02 11:17:39 +02:00
aschumann-virtualcable
2731013710 fix v4.0 actor 2025-09-25 19:18:53 +02:00
aschumann-virtualcable
39aa842d7f Merge branch 'dev/andres/v4.0' of github.com:VirtualCable/openuds into dev/andres/v4.0 2025-09-25 19:10:05 +02:00
Adolfo Gómez García
99b33d0603 Reset actor 2025-09-25 19:02:22 +02:00
aschumann-virtualcable
398ec1aac0 Merge remote-tracking branch 'origin/v4.0' into dev/andres/v4.0 2025-09-25 12:13:11 +02:00
18 changed files with 620 additions and 887 deletions

2
actor

Submodule actor updated: 4638fd77a2...04ce3fc2d1

2
client

Submodule client updated: 517f8935a2...5b044bca34

View File

@@ -158,27 +158,29 @@ class MetaPools(ModelHandler[MetaPoolItem]):
(i.pool.userServices.filter(state=State.PREPARING).count()) for i in all_pools (i.pool.userServices.filter(state=State.PREPARING).count()) for i in all_pools
) )
return MetaPoolItem( val = {
id=item.uuid, 'id': item.uuid,
name=item.name, 'name': item.name,
short_name=item.short_name, 'short_name': item.short_name,
tags=[tag.tag for tag in item.tags.all()], 'tags': [tag.tag for tag in item.tags.all()],
comments=item.comments, 'comments': item.comments,
thumb=item.image.thumb64 if item.image is not None else DEFAULT_THUMB_BASE64, 'thumb': item.image.thumb64 if item.image is not None else DEFAULT_THUMB_BASE64,
image_id=item.image.uuid if item.image is not None else None, 'image_id': item.image.uuid if item.image is not None else None,
servicesPoolGroup_id=pool_group_id, 'servicesPoolGroup_id': pool_group_id,
pool_group_name=pool_group_name, 'pool_group_name': pool_group_name,
pool_group_thumb=pool_group_thumb, 'pool_group_thumb': pool_group_thumb,
user_services_count=userservices_total, 'user_services_count': userservices_total,
user_services_in_preparation=userservices_in_preparation, 'user_services_in_preparation': userservices_in_preparation,
visible=item.visible, 'visible': item.visible,
policy=str(item.policy), 'policy': str(item.policy),
fallbackAccess=item.fallbackAccess, 'fallbackAccess': item.fallbackAccess,
permission=permissions.effective_permissions(self._user, item), 'permission': permissions.effective_permissions(self._user, item),
calendar_message=item.calendar_message, 'calendar_message': item.calendar_message,
transport_grouping=item.transport_grouping, 'transport_grouping': str(item.transport_grouping),
ha_policy=str(item.ha_policy), 'ha_policy': str(item.ha_policy),
) }
return val
# Gui related # Gui related
def get_gui(self, for_type: str) -> list[types.ui.GuiElement]: def get_gui(self, for_type: str) -> list[types.ui.GuiElement]:

View File

@@ -1,580 +0,0 @@
import token
from typing import List, Dict, Any, Tuple
from py import log
from pyparsing import col
def print_resource_names(items: List[Dict[str, Any]], title: str) -> None:
logging.info("list of %s:", title)
for idx, item in enumerate(items, start=1):
name = item.get('metadata', {}).get('name', 'UNKNOWN')
logging.info("%d - %s", idx, name)
#
# Copyright (c) 2025 Virtual Cable S.L.U.
# All rights reserved.
#
# Redistribution and use in source and binary forms, with or without modification,
# are permitted provided that the following conditions are met:
#
# * Redistributions of source code must retain the above copyright notice,
# this list of conditions and the following disclaimer.
# * Redistributions in binary form must reproduce the above copyright notice,
# this list of conditions and the following disclaimer in the documentation
# and/or other materials provided with the distribution.
# * Neither the name of Virtual Cable S.L.U. nor the names of its contributors
# may be used to endorse or promote products derived from this software
# without specific prior written permission.
#
# THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS IS"
# AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE
# IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE ARE
# DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT HOLDER OR CONTRIBUTORS BE LIABLE
# FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR CONSEQUENTIAL
# DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR
# SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER
# CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY,
# OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE
# OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
"""
Author: Adolfo Gómez, dkmaster at dkmon dot com
"""
import collections.abc
import typing
import datetime
import urllib.parse
import logging
import requests
import time
from uds.core.util import security
from uds.core.util.cache import Cache
from uds.core.util.decorators import cached
from uds.core.util.model import sql_now
from . import types, consts, exceptions
logger = logging.getLogger(__name__)
class OpenshiftClient:
cluster_url: str
api_url: str
username: str
password: str
namespace: str
_verify_ssl: bool
_timeout: int
_token_expiry: datetime.datetime
_session: typing.Optional[requests.Session] = None
cache: typing.Optional['Cache']
def __init__(
self,
cluster_url: str,
api_url: str,
username: str,
password: str,
namespace: str = 'default',
timeout: int = 5,
verify_ssl: bool = False,
cache: typing.Optional['Cache'] = None,
) -> None:
self.cluster_url = cluster_url
self.api_url = api_url
self.username : str = username
self.password: str = password
self.namespace: str = namespace
self._verify_ssl: bool = verify_ssl
self._timeout: int = timeout
self.cache = cache
self._access_token = ''
self._token_expiry = datetime.datetime.min
@property
def session(self) -> requests.Session:
return self.connect()
def connect(self, force: bool = False) -> requests.Session:
# For testing, always use the fixed token
session = self._session = security.secure_requests_session(verify=self._verify_ssl)
session.headers.update(
{
'Accept': 'application/json',
'Content-Type': 'application/json',
'Authorization': 'Bearer sha256~82_k2GpPJ4som5Vfy3P6kcQgeeHdxo-_txXR760K_tM',
}
)
return session
def get_token(self) -> str|None:
return "sha256~82_k2GpPJ4som5Vfy3P6kcQgeeHdxo-_txXR760K_tM"
try:
url = f"{self.cluster_url}/oauth/authorize?client_id=openshift-challenging-client&response_type=token"
r = requests.get(url, auth=(self.username, self.password), timeout=15, allow_redirects=True, verify=False)
if "access_token=" not in r.url:
raise Exception("No se encontró access_token en la URL de respuesta")
token = r.url.split("access_token=")[1].split("&")[0]
return token
except Exception as ex:
logging.error(f"No se pudo obtener token: {ex}")
raise
def get_api_url(self, path: str, *parameters: tuple[str, str]) -> str:
url = self.api_url + path
if parameters:
url += '?' + urllib.parse.urlencode(
parameters, doseq=True, safe='[]', quote_via=urllib.parse.quote_plus
)
return url
def do_request(
self,
method: typing.Literal['GET', 'POST', 'PUT', 'DELETE'],
path: str,
*parameters: tuple[str, str],
data: typing.Any = None,
check_for_success: bool = False,
) -> typing.Any:
logger.debug(
'Requesting %s %s with parameters %s and data %s',
method.upper(),
path,
parameters,
data,
)
try:
match method:
case 'GET':
response = self.session.get(
self.get_api_url(path, *parameters),
timeout=self._timeout,
)
case 'POST':
response = self.session.post(
self.get_api_url(path, *parameters),
json=data,
timeout=self._timeout,
)
case 'PUT':
response = self.session.put(
self.get_api_url(path, *parameters),
json=data,
timeout=self._timeout,
)
case 'DELETE':
response = self.session.delete(
self.get_api_url(path, *parameters),
timeout=self._timeout,
)
case _:
raise ValueError(f'Unsupported HTTP method: {method}')
except requests.ConnectionError as e:
raise exceptions.OpenshiftConnectionError(str(e))
except requests.RequestException as e:
raise exceptions.OpenshiftError(f'Error during request: {str(e)}')
logger.debug('Request result to %s: %s -- %s', path, response.status_code, response.content[:64])
if not response.ok:
if response.status_code == 401:
# Unauthorized, try to refresh the token
logger.debug('Unauthorized request, refreshing token')
self._session = None
raise exceptions.OpenshiftAuthError(
'Unauthorized request, please check your credentials or token expiry'
)
elif response.status_code == 403:
# Forbidden, user does not have permissions
logger.debug('Forbidden request, check your permissions')
raise exceptions.OpenshiftPermissionError('Forbidden request, please check your permissions')
elif response.status_code == 404:
# Not found, resource does not exist
logger.debug('Resource not found: %s', path)
raise exceptions.OpenshiftNotFoundError(f'Resource not found: {path}')
error_message = f'Error on request {method.upper()} {path}: {response.status_code} - {response.content.decode("utf8")[:128]}'
logger.debug(error_message)
raise exceptions.OpenshiftError(error_message)
try:
data = response.json()
except Exception as e:
error_message = f'Error parsing JSON response from {method.upper()} {path}: {str(e)}'
logger.debug(error_message)
raise exceptions.OpenshiftError(error_message)
if check_for_success and not data.get('success', False):
error_message = f'Error on request {method.upper()} {path}: {data.get("error", "Unknown error")}'
logger.debug(error_message)
raise exceptions.OpenshiftError(error_message)
return data
def do_paginated_request(
self,
method: typing.Literal['GET', 'POST', 'PUT', 'DELETE'],
path: str,
key: str,
*parameters: tuple[str, str],
data: typing.Any = None,
) -> typing.Iterator[typing.Any]:
"""
Make a paginated request to the Openshift API.
Args:
method (str): HTTP method to use (GET, POST, PUT, DELETE)
path (str): API endpoint path
*parameters: Additional parameters to include in the request
data (Any): Data to send with the request (for POST/PUT)
Yields:
typing.Any: The JSON response from each page of the request
Note:
The responses has also the "meta" key, which contains pagination information:
offset: int64
max: int64
size: int64
total: int64
This information is used to determine if there are more pages to fetch.
If not present, we try our best by counting the number of items returned
and comparing it with the items requested per page (consts.MAX_ITEMS_PER_REQUEST).
"""
offset = 0
while True:
params: list[tuple[str, str]] = [i for i in parameters] + [
('max', str(consts.MAX_ITEMS_PER_REQUEST)),
('offset', str(offset)),
]
response = self.do_request(method, path, *params, data=data)
data = response.get(key, [])
yield from data
# Checke meta information to see if we have more pages
meta = response.get('meta', {})
if not meta: # Do our best to avoid errors if meta is not present
# Check if we have more pages
if len(data) < consts.MAX_ITEMS_PER_REQUEST:
break
elif meta.get('offset', 0) + meta.get('size', 0) >= meta.get('total', 0):
# No more pages, as offset is greater than or equal to total
break
offset += consts.MAX_ITEMS_PER_REQUEST
#* --- OpenShift resource Methods ---
def get_vm_by_name(self, vm_name: str) -> types.VMDefinition | None:
path = f"apis/kubevirt.io/v1/namespaces/{self.namespace}/virtualmachines/{vm_name}"
try:
response = self.do_request('GET', path)
return types.VMDefinition.from_dict(response) # Convertir a VMDefinition aquí
except exceptions.OpenshiftNotFoundError:
return None
except Exception as e:
logger.info(f"Error getting VM {vm_name}: {e}")
return None
def monitor_vm_clone(self, api_url: str, namespace: str, clone_name: str, polling_interval: int = 5) -> None:
clone_url = f"{api_url}/apis/clone.kubevirt.io/v1alpha1/namespaces/{namespace}/virtualmachineclones/{clone_name}"
headers = {"Authorization": f"Bearer {self.get_token()}", "Accept": "application/json"}
logging.info("Monitoring clone process for '%s'...", clone_name)
while True:
try:
response = requests.get(clone_url, headers=headers, verify=False)
if response.status_code == 200:
clone_data = response.json()
status = clone_data.get('status', {})
phase = status.get('phase', 'Unknown')
logging.info("Phase: %s", phase)
for condition in status.get('conditions', []):
ctype = condition.get('type', '')
cstatus = condition.get('status', '')
cmsg = condition.get('message', '')
logging.info(" %s: %s - %s", ctype, cstatus, cmsg)
if phase == 'Succeeded':
logging.info("Clone '%s' completed successfully!", clone_name)
break
elif phase == 'Failed':
logging.error("Clone '%s' failed!", clone_name)
break
elif response.status_code == 404:
logging.warning("Clone resource '%s' not found. May have been cleaned up.", clone_name)
break
else:
logging.error("Error monitoring clone: %d", response.status_code)
except Exception as e:
logging.error("Monitoring exception: %s", e)
logging.info("Waiting %d seconds before next check...", polling_interval)
time.sleep(polling_interval)
def get_vm_pvc_or_dv_name(self, api_url: str, namespace: str, vm_name: str) -> Tuple[str, str]:
vm_url = f"{api_url}/apis/kubevirt.io/v1/namespaces/{namespace}/virtualmachines/{vm_name}"
headers = {"Authorization": f"Bearer {self.get_token()}", "Accept": "application/json"}
response = requests.get(vm_url, headers=headers, verify=False)
if response.status_code == 200:
vm_obj = response.json()
volumes = vm_obj.get("spec", {}).get("template", {}).get("spec", {}).get("volumes", [])
for vol in volumes:
pvc = vol.get("persistentVolumeClaim")
if pvc:
return pvc.get("claimName"), "pvc"
dv = vol.get("dataVolume")
if dv:
return dv.get("name"), "dv"
raise Exception(f"No PVC or DataVolume found in VM {vm_name}")
def get_datavolume_size(self, api_url: str, namespace: str, dv_name: str) -> str:
url = f"{api_url}/apis/cdi.kubevirt.io/v1beta1/namespaces/{namespace}/datavolumes/{dv_name}"
headers = {"Authorization": f"Bearer {self.get_token()}", "Accept": "application/json"}
response = requests.get(url, headers=headers, verify=False)
if response.status_code == 200:
dv = response.json()
size = dv.get("status", {}).get("amount", None)
if size:
return size
return dv.get("spec", {}).get("pvc", {}).get("resources", {}).get("requests", {}).get("storage") or ""
raise Exception(f"No se pudo obtener el tamaño del DataVolume {dv_name}")
def get_pvc_size(self, api_url: str, namespace: str, pvc_name: str) -> str:
url = f"{api_url}/api/v1/namespaces/{namespace}/persistentvolumeclaims/{pvc_name}"
headers = {"Authorization": f"Bearer {token}", "Accept": "application/json"}
response = requests.get(url, headers=headers, verify=False)
if response.status_code == 200:
pvc = response.json()
capacity = pvc.get("status", {}).get("capacity", {}).get("storage")
if capacity:
return capacity
raise Exception(f"No se pudo obtener el tamaño del PVC {pvc_name}")
def clone_pvc_with_datavolume(self, api_url: str, namespace: str, source_pvc_name: str, cloned_pvc_name: str, storage_class: str, storage_size: str) -> bool:
dv_url = f"{api_url}/apis/cdi.kubevirt.io/v1beta1/namespaces/{namespace}/datavolumes"
headers = {
"Authorization": f"Bearer {token}",
"Accept": "application/json",
"Content-Type": "application/json"
}
body: Dict[str, Any] = {
"apiVersion": "cdi.kubevirt.io/v1beta1",
"kind": "DataVolume",
"metadata": {"name": cloned_pvc_name, "namespace": namespace},
"spec": {
"source": {
"pvc": {"name": source_pvc_name, "namespace": namespace}
},
"pvc": {
"accessModes": ["ReadWriteOnce"],
"resources": {"requests": {"storage": storage_size}},
"storageClassName": storage_class
}
}
}
response = requests.post(dv_url, headers=headers, json=body, verify=False)
if response.status_code == 201:
logging.info(f"DataVolume '{cloned_pvc_name}' creado exitosamente")
return True
logging.error(f"Fallo al crear DataVolume: {response.status_code} {response.text}")
return False
def create_vm_from_pvc(self, api_url: str, namespace: str, source_vm_name: str, new_vm_name: str, new_dv_name: str, source_pvc_name: str) -> bool:
original_vm_url = f"{api_url}/apis/kubevirt.io/v1/namespaces/{namespace}/virtualmachines/{source_vm_name}"
headers = {"Authorization": f"Bearer {token}", "Accept": "application/json"}
resp = requests.get(original_vm_url, headers=headers, verify=False)
if resp.status_code != 200:
logging.error(f"No se pudo obtener la VM origen: {resp.status_code} {resp.text}")
return False
vm_obj = resp.json()
vm_obj['metadata']['name'] = new_vm_name
for k in ['resourceVersion', 'uid', 'selfLink']:
vm_obj['metadata'].pop(k, None)
vm_obj.pop('status', None)
vm_obj['spec'].pop('running', None)
vm_obj['spec']['runStrategy'] = 'Always'
for vol in vm_obj['spec']['template']['spec']['volumes']:
if 'dataVolume' in vol:
vol['dataVolume']['name'] = new_dv_name
elif 'persistentVolumeClaim' in vol:
vol['persistentVolumeClaim']['claimName'] = new_dv_name
vm_obj['spec']['dataVolumeTemplates'] = [{
"metadata": {"name": new_dv_name},
"spec": {
"source": {"pvc": {"name": source_pvc_name}},
"pvc": {
"accessModes": ["ReadWriteOnce"],
"resources": {"requests": {"storage": "399Gi"}},
"storageClassName": "crc-csi-hostpath-provisioner"
}
}
}]
interfaces = vm_obj.get('spec', {}).get('template', {}).get('spec', {}).get('domain', {}).get('devices', {}).get('interfaces', [])
for iface in interfaces:
iface.pop('macAddress', None)
create_url = f"{api_url}/apis/kubevirt.io/v1/namespaces/{namespace}/virtualmachines"
headers["Content-Type"] = "application/json"
resp = requests.post(create_url, headers=headers, json=vm_obj, verify=False)
if resp.status_code == 201:
logging.info(f"VM '{new_vm_name}' creada correctamente con DataVolumeTemplate.")
return True
logging.error(f"Error creando VM: {resp.status_code} {resp.text}")
return False
def wait_for_datavolume_clone_progress(self, api_url: str, namespace: str, datavolume_name: str, timeout: int = 3000, polling_interval: int = 5) -> bool:
url = f"{api_url}/apis/cdi.kubevirt.io/v1beta1/namespaces/{namespace}/datavolumes/{datavolume_name}"
headers = {"Authorization": f"Bearer {token}", "Accept": "application/json"}
start = time.time()
while time.time() - start < timeout:
response = requests.get(url, headers=headers, verify=False)
if response.status_code == 200:
dv = response.json()
status = dv.get('status', {})
phase = status.get('phase')
progress = status.get('progress', 'N/A')
logging.info(f"DataVolume {datavolume_name} status: {phase}, progress: {progress}")
if phase == 'Succeeded':
logging.info(f"DataVolume {datavolume_name} clonación completada")
return True
elif phase == 'Failed':
logging.error(f"DataVolume {datavolume_name} clonación fallida")
return False
else:
logging.error(f"Error consultando DataVolume {datavolume_name}: {response.status_code}")
time.sleep(polling_interval)
logging.error(f"Timeout esperando clonación de DataVolume {datavolume_name}")
return False
def start_vm(self, api_url: str, namespace: str, vm_name: str) -> bool:
url = f"{api_url}/apis/kubevirt.io/v1/namespaces/{namespace}/virtualmachines/{vm_name}"
headers = {
"Authorization": f"Bearer {token}",
"Content-Type": "application/merge-patch+json",
"Accept": "application/json"
}
body: Dict[str, Any] = {
"spec": {
"runStrategy": "Always"
}
}
response = requests.patch(url, headers=headers, json=body, verify=False)
if response.status_code in [200, 201]:
logging.info(f"VM {vm_name} puesta en marcha.")
return True
else:
logging.info(f"Error al arrancar la VM {vm_name}: {response.status_code} - {response.text}")
return False
def stop_vm(self, api_url: str, namespace: str, vm_name: str) -> bool:
url = f"{api_url}/apis/kubevirt.io/v1/namespaces/{namespace}/virtualmachines/{vm_name}"
headers = {
"Authorization": f"Bearer {token}",
"Content-Type": "application/merge-patch+json",
"Accept": "application/json"
}
body: Dict[str, Any] = {
"spec": {
"runStrategy": "Halted"
}
}
response = requests.patch(url, headers=headers, json=body, verify=False)
if response.status_code in [200, 201]:
logging.info(f"VM {vm_name} se apagará.")
return True
else:
logging.info(f"Error al apagar la VM {vm_name}: {response.status_code} - {response.text}")
return False
def copy_vm_same_size(self, api_url: str, namespace: str, source_vm_name: str, new_vm_name: str, storage_class: str) -> None:
source_pvc_name, vol_type = self.get_vm_pvc_or_dv_name(api_url, namespace, source_vm_name)
size = self.get_pvc_size(api_url, namespace, source_pvc_name)
new_pvc_name = f"{new_vm_name}-disk"
if self.clone_pvc_with_datavolume(api_url, namespace, source_pvc_name, new_pvc_name, storage_class, size):
self.create_vm_from_pvc(api_url, namespace, source_vm_name, new_vm_name, new_pvc_name, source_pvc_name)
else:
logging.error("Error al clonar PVC")
# @cached('test', consts.CACHE_VM_INFO_DURATION)
def test(self) -> bool:
# Simple test: try to enumerate VMs to check connectivity and authentication
try:
vm_url = f"{self.api_url}/apis/kubevirt.io/v1/namespaces/{self.namespace}/virtualmachines"
headers = {'Authorization': f'Bearer {self.get_token()}', 'Accept': 'application/json'}
response = requests.get(vm_url, headers=headers, verify=self._verify_ssl, timeout=self._timeout)
response.raise_for_status()
logger.debug('Successfully enumerated VMs for test')
return True
except Exception as e:
logger.error(f"Error testing Openshift by enumerating VMs: {e}")
raise exceptions.OpenshiftConnectionError(str(e)) from e
return False
def enumerate_vms(self) -> collections.abc.Iterator[types.VMDefinition]:
"""
Fetch all VMs from KubeVirt API in the current namespace as VMDefinition objects using do_paginated_request.
"""
yield from (
types.VMDefinition.from_dict(vm)
for vm in self.do_paginated_request(
'GET',
f'apis/kubevirt.io/v1/namespaces/{self.namespace}/virtualmachines',
'items'
)
)
@cached('vms', consts.CACHE_INFO_DURATION)
def list_vms(self) -> list[types.VMDefinition]:
"""
List all VMs in the current namespace as VMDefinition objects.
"""
return list(self.enumerate_vms())
@cached('instance', consts.CACHE_VM_INFO_DURATION)
def get_instance_info(self, vm_name: str, force: bool = False) -> dict | None:
"""
Get a specific VM by name in the current namespace.
Returns the VM dict if found, else None.
"""
return self.get_vm_by_name(vm_name)
def start_instance(self, vm_name: str) -> bool:
"""
Start a specific VM by name.
Returns True if started successfully, False otherwise.
"""
return self.start_vm(self.api_url, self.namespace, vm_name)
def stop_instance(self, vm_name: str) -> bool:
"""
Stop a specific VM by name.
Returns True if stopped successfully, False otherwise.
"""
return self.stop_vm(self.api_url, self.namespace, vm_name)
def clone_instance(self, source_vm_name: str, new_vm_name: str, storage_class: str) -> bool:
"""
Clone a VM by name, creating a new VM with the same size.
Returns True if clone succeeded, False otherwise.
"""
try:
self.copy_vm_same_size(self.api_url, self.namespace, source_vm_name, new_vm_name, storage_class)
return True
except Exception as e:
logging.error(f"Error cloning VM: {e}")
return False
@staticmethod
def validate_vm_id(vm_id: str | int) -> None:
try:
int(vm_id)
except ValueError:
raise exceptions.OpenshiftNotFoundError(f'VM {vm_id} not found')

View File

@@ -9,4 +9,4 @@ Author: Adolfo Gómez, dkmaster at dkmon dot com
# pyright: reportUnusedImport=false # pyright: reportUnusedImport=false
from uds.core import managers from uds.core import managers
from .provider import OpenshiftProvider from .provider import MorpheusProvider

View File

@@ -101,20 +101,29 @@ class OpenshiftUserService(DynamicUserService, autoserializable.AutoSerializable
def op_create_checker(self) -> types.states.TaskState: def op_create_checker(self) -> types.states.TaskState:
""" """
Checks the state of a deploy for a user or cache, robust and delegando a OpenshiftClient. Checks the state of a deploy for an user or cache
""" """
# Wait until we have created the vm is _name is not emtpy
if self._waiting_name: if self._waiting_name:
# Buscar la VM clonada por nombre usando enumerate_instances found_vms = self.service().api.list_instances(name=self._vm_name(), force=True) # Do not use cache
vms = self.service().api.enumerate_instances() if not found_vms:
found = [vm for vm in vms if vm.get('metadata', {}).get('name') == self._vm_name()]
if not found:
return types.states.TaskState.RUNNING return types.states.TaskState.RUNNING
self._vmid = found[0].get('metadata', {}).get('uid', '') else: # We have the instance, clear _waiting_name and set _vmid
self._waiting_name = False self._vmid = str(found_vms[0].id)
self._waiting_name = False
instance = self.service().api.get_instance_info(self._vmid) instance = self.service().api.get_instance_info(self._vmid)
if not instance.interfaces or getattr(instance.interfaces[0], 'mac_address', '') == '':
# If the instace has a mac address, consider it as created and
# let Openshift to finish the provisioning as it needs to do
if not instance.interfaces or instance.interfaces[0].mac_address == '':
# No interfaces, so we cannot use it yet
return types.states.TaskState.RUNNING return types.states.TaskState.RUNNING
# if not instance.status.is_provisioning():
# # If instance is running, we can finish the publication
# return types.states.TaskState.FINISHED
return types.states.TaskState.FINISHED return types.states.TaskState.FINISHED
# In fact, we probably don't need to check task status, but this way we can include the error # In fact, we probably don't need to check task status, but this way we can include the error

View File

@@ -35,7 +35,7 @@ from uds.core import types
from uds.core.services.generics.fixed.userservice import FixedUserService from uds.core.services.generics.fixed.userservice import FixedUserService
from uds.core.util import autoserializable from uds.core.util import autoserializable
from .openshift import types as morph_types from .openshift import types as openshift_types
# Not imported at runtime, just for type checking # Not imported at runtime, just for type checking
if typing.TYPE_CHECKING: if typing.TYPE_CHECKING:
@@ -82,7 +82,7 @@ class OpenshiftUserServiceFixed(FixedUserService, autoserializable.AutoSerializa
self.service().api.stop_instance(self._vmid) self.service().api.stop_instance(self._vmid)
# Check methods # Check methods
def _check_status(self, *status: morph_types.InstanceStatus) -> types.states.TaskState: def _check_status(self, *status: openshift_types.InstanceStatus) -> types.states.TaskState:
""" """
Checks the status of the instance and returns the appropriate TaskState. Checks the status of the instance and returns the appropriate TaskState.
""" """
@@ -101,8 +101,8 @@ class OpenshiftUserServiceFixed(FixedUserService, autoserializable.AutoSerializa
Checks if machine has started Checks if machine has started
""" """
return self._check_status( return self._check_status(
morph_types.InstanceStatus.RUNNING, openshift_types.InstanceStatus.RUNNING,
morph_types.InstanceStatus.PROVISIONING, openshift_types.InstanceStatus.PROVISIONING,
) )
def op_stop_checker(self) -> types.states.TaskState: def op_stop_checker(self) -> types.states.TaskState:
@@ -110,6 +110,6 @@ class OpenshiftUserServiceFixed(FixedUserService, autoserializable.AutoSerializa
Checks if machine has stoped Checks if machine has stoped
""" """
return self._check_status( return self._check_status(
morph_types.InstanceStatus.STOPPED, openshift_types.InstanceStatus.STOPPED,
morph_types.InstanceStatus.SUSPENDED, openshift_types.InstanceStatus.SUSPENDED,
) )

View File

@@ -0,0 +1,511 @@
#
# Copyright (c) 2025 Virtual Cable S.L.U.
# All rights reserved.
#
# Redistribution and use in source and binary forms, with or without modification,
# are permitted provided that the following conditions are met:
#
# * Redistributions of source code must retain the above copyright notice,
# this list of conditions and the following disclaimer.
# * Redistributions in binary form must reproduce the above copyright notice,
# this list of conditions and the following disclaimer in the documentation
# and/or other materials provided with the distribution.
# * Neither the name of Virtual Cable S.L.U. nor the names of its contributors
# may be used to endorse or promote products derived from this software
# without specific prior written permission.
#
# THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS IS"
# AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE
# IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE ARE
# DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT HOLDER OR CONTRIBUTORS BE LIABLE
# FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR CONSEQUENTIAL
# DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR
# SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER
# CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY,
# OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE
# OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
"""
Author: Adolfo Gómez, dkmaster at dkmon dot com
"""
import collections.abc
import typing
import datetime
import urllib.parse
import logging
from uds.core.util import security
from uds.core.util.cache import Cache
from uds.core.util.decorators import cached
from uds.core.util.model import sql_now
from . import types, consts, exceptions
import requests
logger = logging.getLogger(__name__)
# caching helper
def caching_key_helper(obj: 'OpenshiftClient') -> str:
return obj._host # pylint: disable=protected-access
class OpenshiftClient:
_host: str
_port: int
_username: str
_password: str
_verify_ssl: bool
_timeout: int
_url: str
_access_token: str
_token_expiry: datetime.datetime
_session: typing.Optional[requests.Session] = None
cache: typing.Optional['Cache']
def __init__(
self,
host: str,
port: int,
username: str,
password: str,
timeout: int = 5,
verify_ssl: bool = False,
cache: typing.Optional['Cache'] = None,
client_id: str | None = None,
) -> None:
self._host = host
self._port = port
self._username = username
self._password = password
self._verify_ssl = verify_ssl
self._timeout = timeout
self._client_id = client_id or 'morph-api'
self.cache = cache
self._access_token = self._refresh_token = ''
self._token_expiry = datetime.datetime.min
self._url = f'https://{host}' + (f':{port}' if port != 443 else '') + '/'
@property
def session(self) -> requests.Session:
return self.connect()
def connect(self, force: bool = False) -> requests.Session:
now = sql_now()
if self._access_token and self._session and self._token_expiry > now and not force:
return self._session
session = self._session = security.secure_requests_session(verify=self._verify_ssl)
self._session.headers.update(
{
'Accept': 'application/json',
'Content-Type': 'application/json',
}
)
def set_auth_header() -> requests.Session:
"""Set the Authorization header with the given token."""
session.headers.update({'Authorization': f'Bearer {self._access_token}'})
return session
# If we have an access token, use it as bearer token
# If the token is expired, we will refresh it
# If force is True, we will refresh the token even if it is not expired
if self._access_token and self._token_expiry > now and not force:
return set_auth_header()
try:
result = session.post(
# ?client_id=morph-api&grant_type=password&scope=write
url=self.get_api_url(
'oauth/token',
('client_id', self._client_id),
('grant_type', 'password'),
('scope', 'write'),
),
data={
'username': self._username,
'password': self._password,
},
headers={
'Accept': 'application/json',
'Content-Type': 'application/x-www-form-urlencoded',
},
timeout=self._timeout,
)
if not result.ok:
raise exceptions.OpenshiftAuthError(result.content.decode('utf8'))
data = result.json()
self._access_token = data['access_token']
# self._refresh_token = data['refresh_token'] # Not used, but could be used to refresh the token
self._token_expiry = datetime.datetime.now() + datetime.timedelta(
seconds=data.get('expires_in', 3600)
) # Default to 1 hour if not provided
return set_auth_header()
except requests.RequestException as e:
raise exceptions.OpenshiftConnectionError(str(e)) from e
def get_api_url(self, path: str, *parameters: tuple[str, str]) -> str:
url = self._url + path
if parameters:
url += '?' + urllib.parse.urlencode(
parameters, doseq=True, safe='[]', quote_via=urllib.parse.quote_plus
)
return url
def do_request(
self,
method: typing.Literal['GET', 'POST', 'PUT', 'DELETE'],
path: str,
*parameters: tuple[str, str],
data: typing.Any = None,
check_for_success: bool = False,
) -> typing.Any:
logger.debug(
'Requesting %s %s with parameters %s and data %s',
method.upper(),
path,
parameters,
data,
)
try:
match method:
case 'GET':
response = self.session.get(
self.get_api_url(path, *parameters),
timeout=self._timeout,
)
case 'POST':
response = self.session.post(
self.get_api_url(path, *parameters),
json=data,
timeout=self._timeout,
)
case 'PUT':
response = self.session.put(
self.get_api_url(path, *parameters),
json=data,
timeout=self._timeout,
)
case 'DELETE':
response = self.session.delete(
self.get_api_url(path, *parameters),
timeout=self._timeout,
)
case _:
raise ValueError(f'Unsupported HTTP method: {method}')
except requests.ConnectionError as e:
raise exceptions.OpenshiftConnectionError(str(e))
except requests.RequestException as e:
raise exceptions.OpenshiftError(f'Error during request: {str(e)}')
logger.debug('Request result to %s: %s -- %s', path, response.status_code, response.content[:64])
if not response.ok:
if response.status_code == 401:
# Unauthorized, try to refresh the token
logger.debug('Unauthorized request, refreshing token')
self._session = None
raise exceptions.OpenshiftAuthError(
'Unauthorized request, please check your credentials or token expiry'
)
elif response.status_code == 403:
# Forbidden, user does not have permissions
logger.debug('Forbidden request, check your permissions')
raise exceptions.OpenshiftPermissionError('Forbidden request, please check your permissions')
elif response.status_code == 404:
# Not found, resource does not exist
logger.debug('Resource not found: %s', path)
raise exceptions.OpenshiftNotFoundError(f'Resource not found: {path}')
error_message = f'Error on request {method.upper()} {path}: {response.status_code} - {response.content.decode("utf8")[:128]}'
logger.debug(error_message)
raise exceptions.OpenshiftError(error_message)
try:
data = response.json()
except Exception as e:
error_message = f'Error parsing JSON response from {method.upper()} {path}: {str(e)}'
logger.debug(error_message)
raise exceptions.OpenshiftError(error_message)
if check_for_success and not data.get('success', False):
error_message = f'Error on request {method.upper()} {path}: {data.get("error", "Unknown error")}'
logger.debug(error_message)
raise exceptions.OpenshiftError(error_message)
return data
def do_paginated_request(
self,
method: typing.Literal['GET', 'POST', 'PUT', 'DELETE'],
path: str,
key: str,
*parameters: tuple[str, str],
data: typing.Any = None,
) -> typing.Iterator[typing.Any]:
"""
Make a paginated request to the Openshift API.
Args:
method (str): HTTP method to use (GET, POST, PUT, DELETE)
path (str): API endpoint path
*parameters: Additional parameters to include in the request
data (Any): Data to send with the request (for POST/PUT)
Yields:
typing.Any: The JSON response from each page of the request
Note:
The responses has also the "meta" key, which contains pagination information:
offset: int64
max: int64
size: int64
total: int64
This information is used to determine if there are more pages to fetch.
If not present, we try our best by counting the number of items returned
and comparing it with the items requested per page (consts.MAX_ITEMS_PER_REQUEST).
"""
offset = 0
while True:
params: list[tuple[str, str]] = [i for i in parameters] + [
('max', str(consts.MAX_ITEMS_PER_REQUEST)),
('offset', str(offset)),
]
response = self.do_request(method, path, *params, data=data)
data = response.get(key, [])
yield from data
# Checke meta information to see if we have more pages
meta = response.get('meta', {})
if not meta: # Do our best to avoid errors if meta is not present
# Check if we have more pages
if len(data) < consts.MAX_ITEMS_PER_REQUEST:
break
elif meta.get('offset', 0) + meta.get('size', 0) >= meta.get('total', 0):
# No more pages, as offset is greater than or equal to total
break
offset += consts.MAX_ITEMS_PER_REQUEST
@cached('test', consts.CACHE_VM_INFO_DURATION, key_helper=caching_key_helper)
def test(self) -> bool:
try:
self.connect(force=True)
except Exception:
# logger.error('Error testing Openshift: %s', e)
return False
return True
# Not cacheable, as it is a generator
def enumerate_instances(
self, *, name: str | None = None, detailed: bool = False, show_deleted: bool = False
) -> collections.abc.Iterator[types.InstanceInfo]:
"""
Get all instances from Openshift
Args:
name (str|None): If provided, filter instances by name (case-insensitive?)
detailed (bool): If True, return detailed information about instances
Yields:
types.Instance: An instance object with the details of each instance
Raises:
OpenshiftError: If there is an error getting the instances
"""
params = [
('showDeleted', str(show_deleted).lower()),
('details', str(detailed).lower()),
]
if name:
params.append(('name', name))
yield from (
types.InstanceInfo.from_dict(instance)
for instance in self.do_paginated_request('GET', 'api/instances', 'instances', *params)
)
@cached('instances', consts.CACHE_INFO_DURATION, key_helper=caching_key_helper)
def list_instances(
self,
*,
name: str | None = None,
detailed: bool = False,
show_deleted: bool = False,
force: bool = False,
) -> list[types.InstanceInfo]:
return list(self.enumerate_instances(name=name, detailed=detailed, show_deleted=show_deleted))
@cached('instance', consts.CACHE_VM_INFO_DURATION, key_helper=caching_key_helper)
def get_instance_info(self, instance_id: str | int, force: bool = False) -> types.InstanceInfo:
"""
Get a specific instance by ID
"""
OpenshiftClient.validate_instance_id(instance_id)
response = self.do_request(
'GET',
f'api/instances/{instance_id}',
('details', 'true'),
)
return types.InstanceInfo.from_dict(response['instance'])
def clone_instance(self, instance_id: str | int, name: str, group_id: int | None = None) -> None:
"""
Clone a specific instance by ID
TODO: maybe we can change the network interface configuration
"networkInterfaces": [{ "network": { "id": "subnet-12542" } }]
"""
# "https://172.27.0.1/api/instances/{id}/clone"
# Params: name, group: {'id': id} (optional)
OpenshiftClient.validate_instance_id(instance_id)
data: dict[str, typing.Any] = {
'name': name,
}
if group_id is not None:
data['group'] = {'id': group_id}
self.do_request(
'PUT',
f'api/instances/{instance_id}/clone',
data=data,
check_for_success=True,
)
def start_instance(self, instance_id: str | int) -> None:
"""
Start a specific instance by ID
Args:
instance_id (int): The ID of the instance to start
Raises:
OpenshiftError: If there is an error starting the instance
"""
OpenshiftClient.validate_instance_id(instance_id)
self.do_request(
'PUT',
f'api/instances/{instance_id}/start',
check_for_success=True,
)
def stop_instance(self, instance_id: str | int) -> None:
"""
Stop a specific instance by ID
Args:
instance_id (int): The ID of the instance to stop
force (bool): If True, force stop the instance
Raises:
OpenshiftError: If there is an error stopping the instance
"""
OpenshiftClient.validate_instance_id(instance_id)
self.do_request(
'PUT',
f'api/instances/{instance_id}/stop',
check_for_success=True,
)
def restart_instance(self, instance_id: str | int) -> None:
"""
Restart a specific instance by ID
Args:
instance_id (int): The ID of the instance to restart
force (bool): If True, force restart the instance
Raises:
OpenshiftError: If there is an error restarting the instance
"""
OpenshiftClient.validate_instance_id(instance_id)
self.do_request(
'PUT',
f'api/instances/{instance_id}/restart',
check_for_success=True,
)
def delete_instance(self, instance_id: str | int, force: bool = False) -> None:
"""
Delete a specific instance by ID
Args:
instance_id (int): The ID of the instance to delete
Raises:
OpenshiftError: If there is an error deleting the instance
"""
OpenshiftClient.validate_instance_id(instance_id)
params = [('force', 'true')] if force else []
self.do_request(
'DELETE',
f'api/instances/{instance_id}',
*params,
check_for_success=True,
)
def enumerate_groups(self) -> collections.abc.Iterator[types.BasicInfo]:
"""
Get all groups from Openshift
Yields:
types.IdValuePair: An IdValuePair object with the details of each group
"""
yield from (
types.BasicInfo.from_dict(group)
for group in self.do_paginated_request('GET', 'api/groups', 'groups')
)
@cached('groups', consts.CACHE_INFO_DURATION, key_helper=caching_key_helper)
def list_groups(self, force: bool = False) -> list[types.BasicInfo]:
"""
List all groups available in Openshift
Returns:
list[types.IdValuePair]: A list of IdValuePair objects representing the groups
"""
return list(self.enumerate_groups())
def enumerate_clouds(self, group_id: int | None = None) -> collections.abc.Iterator[types.BasicInfo]:
"""
Get all clouds from Openshift
Args:
group (str|None): If provided, filter clouds by group name
Yields:
types.BasicInfo: An IdValuePair object with the details of each cloud
"""
parameters: list[tuple[str, str]] = []
if group_id is not None:
parameters.append(('groupId', str(group_id)))
yield from (
types.BasicInfo.from_dict(cloud)
for cloud in self.do_paginated_request('GET', 'api/zones', 'zones', *parameters)
)
@cached('clouds', consts.CACHE_INFO_DURATION, key_helper=caching_key_helper)
def list_clouds(self, group_id: int | None = None, force: bool = False) -> list[types.BasicInfo]:
"""
List all clouds available in Openshift
Returns:
list[types.IdValuePair]: A list of IdValuePair objects representing the clouds
"""
return list(self.enumerate_clouds(group_id=group_id))
@staticmethod
def validate_instance_id(instance_id: str | int) -> None:
try:
int(instance_id)
except ValueError:
raise exceptions.OpenshiftNotFoundError(f'Instance {instance_id} not found')

View File

@@ -1,11 +1,8 @@
import collections.abc
import dataclasses import dataclasses
import enum import enum
import typing import typing
import logging import logging
from typing import TypedDict, Any
from . import exceptions from . import exceptions
@@ -236,193 +233,3 @@ class InstanceInfo:
except Exception as e: # Any exception during parsing will generate a "null" except Exception as e: # Any exception during parsing will generate a "null"
logger.error(f'Error creating Instance from dict: {e}') logger.error(f'Error creating Instance from dict: {e}')
return InstanceInfo.null() return InstanceInfo.null()
#* --- OpenShift resource TypedDicts ---
@dataclasses.dataclass
class VMMetadata:
name: str
namespace: str
uid: str
@staticmethod
def from_dict(dictionary: collections.abc.MutableMapping[str, typing.Any]) -> 'VMMetadata':
return VMMetadata(
name=dictionary.get('name', ''),
namespace=dictionary.get('namespace', ''),
uid=dictionary.get('uid', ''),
)
@dataclasses.dataclass
class VMVolumeTemplate:
name: str
storage: str
storage_class: str
@staticmethod
def from_dict(dictionary: collections.abc.MutableMapping[str, typing.Any]) -> 'VMVolumeTemplate':
meta = dictionary.get('metadata', {})
spec = dictionary.get('spec', {})
storage = spec.get('storage', {})
resources = storage.get('resources', {})
requests = resources.get('requests', {})
return VMVolumeTemplate(
name=meta.get('name', ''),
storage=requests.get('storage', ''),
storage_class=storage.get('storageClassName', ''),
)
@dataclasses.dataclass
class VMInterface:
name: str
model: str
mac_address: str
state: str
@staticmethod
def from_dict(dictionary: collections.abc.MutableMapping[str, typing.Any]) -> 'VMInterface':
return VMInterface(
name=dictionary.get('name', ''),
model=dictionary.get('model', ''),
mac_address=dictionary.get('macAddress', ''),
state=dictionary.get('state', ''),
)
@dataclasses.dataclass
class VMDeviceDisk:
name: str
boot_order: int
@staticmethod
def from_dict(dictionary: collections.abc.MutableMapping[str, typing.Any]) -> 'VMDeviceDisk':
return VMDeviceDisk(
name=dictionary.get('name', ''),
boot_order=dictionary.get('bootOrder', 0),
)
@dataclasses.dataclass
class VMVolume:
name: str
data_volume: str
@staticmethod
def from_dict(dictionary: collections.abc.MutableMapping[str, typing.Any]) -> 'VMVolume':
dv = dictionary.get('dataVolume', {})
return VMVolume(
name=dictionary.get('name', ''),
data_volume=dv.get('name', ''),
)
@dataclasses.dataclass
class VMDomain:
architecture: str
disks: list[VMDeviceDisk]
interfaces: list[VMInterface]
volumes: list[VMVolume]
subdomain: str
@staticmethod
def from_dict(dictionary: collections.abc.MutableMapping[str, typing.Any]) -> 'VMDomain':
return VMDomain(
architecture=dictionary.get('architecture', ''),
disks=[VMDeviceDisk.from_dict(disk) for disk in dictionary.get('domain', {}).get('devices', {}).get('disks', [])],
interfaces=[VMInterface.from_dict(iface) for iface in dictionary.get('domain', {}).get('devices', {}).get('interfaces', [])],
volumes=[VMVolume.from_dict(vol) for vol in dictionary.get('volumes', [])],
subdomain=dictionary.get('subdomain', ''),
)
@dataclasses.dataclass
class VMStatus:
printable_status: str
run_strategy: str
@staticmethod
def from_dict(dictionary: collections.abc.MutableMapping[str, typing.Any]) -> 'VMStatus':
return VMStatus(
printable_status=dictionary.get('printableStatus', ''),
run_strategy=dictionary.get('runStrategy', ''),
)
@dataclasses.dataclass
class VMDefinition:
metadata: VMMetadata
run_strategy: str
status: VMStatus
volume_template: VMVolumeTemplate
domain: VMDomain
@staticmethod
def from_dict(dictionary: collections.abc.MutableMapping[str, typing.Any]) -> 'VMDefinition':
spec = dictionary.get('spec', {})
template = spec.get('template', {}).get('spec', {})
return VMDefinition(
metadata=VMMetadata.from_dict(dictionary.get('metadata', {})),
run_strategy=spec.get('runStrategy', ''),
status=VMStatus.from_dict(dictionary.get('status', {})),
volume_template=VMVolumeTemplate.from_dict(spec.get('dataVolumeTemplates', [{}])[0]),
domain=VMDomain.from_dict(template),
)
class ManagedFieldType(TypedDict, total=False):
apiVersion: str
fieldsType: str
fieldsV1: dict[str, Any]
manager: str
operation: str
time: str
subresource: str
class MetadataType(TypedDict, total=False):
name: str
namespace: str
uid: str
resourceVersion: str
creationTimestamp: str
annotations: dict[str, Any]
labels: dict[str, Any]
managedFields: list[ManagedFieldType]
finalizers: list[str]
generation: int
class VMItemType(TypedDict, total=False):
apiVersion: str
kind: str
metadata: MetadataType
spec: dict[str, Any]
status: dict[str, Any]
class VMListType(TypedDict, total=False):
apiVersion: str
kind: str
items: list[VMItemType]
metadata: dict[str, Any]
class NADSpecType(TypedDict, total=False):
config: str
class NADItemType(TypedDict, total=False):
apiVersion: str
kind: str
metadata: MetadataType
spec: NADSpecType
class NADListType(TypedDict, total=False):
apiVersion: str
kind: str
items: list[NADItemType]
metadata: dict[str, Any]
class SCItemType(TypedDict, total=False):
metadata: MetadataType
provisioner: str
parameters: dict[str, Any]
reclaimPolicy: str
volumeBindingMode: str
class SCListType(TypedDict, total=False):
apiVersion: str
kind: str
items: list[SCItemType]
metadata: dict[str, Any]

View File

Before

Width:  |  Height:  |  Size: 2.8 KiB

After

Width:  |  Height:  |  Size: 2.8 KiB

View File

@@ -15,7 +15,7 @@ from django.utils.translation import gettext_noop as _
from uds.core import types as core_types, consts from uds.core import types as core_types, consts
from uds.core.services import ServiceProvider from uds.core.services import ServiceProvider
from uds.core.ui import gui from uds.core.ui import gui
from uds.core.util import fields from uds.core.util import validators, fields
from uds.core.util.decorators import cached from uds.core.util.decorators import cached
from .openshift import client from .openshift import client
@@ -38,66 +38,59 @@ class OpenshiftProvider(ServiceProvider):
icon_file = 'provider.png' icon_file = 'provider.png'
# Gui # Gui
cluster_url = gui.TextField( host = gui.TextField(
order=1, order=1,
length=128,
label=_('Cluster OAuth URL'),
tooltip=_('Openshift OAuth URL, e.g. https://oauth-openshift.apps-crc.testing'),
required=True,
default='https://oauth-openshift.apps-crc.testing',
)
api_url = gui.TextField(
order=2,
length=128,
label=_('API URL'),
tooltip=_('Openshift API URL, e.g. https://localhost:6443'),
required=True,
default='https://localhost:6443',
)
username = gui.TextField(
order=3,
length=64, length=64,
label=_('Host'),
tooltip=_('Openshift Server IP or Hostname'),
required=True,
)
port = gui.NumericField(
order=2,
length=3,
label=_('Port'),
default=443,
tooltip=_('Openshift Server Port (default 443)'),
required=True,
)
verify_ssl = fields.verify_ssl_field(order=3)
username = gui.TextField(
order=4,
length=32,
label=_('Username'), label=_('Username'),
tooltip=_('User with valid privileges on Openshift Server'), tooltip=_('User with valid privileges on Openshift Server'),
required=True, required=True,
default='kubeadmin',
) )
password = gui.PasswordField( password = gui.PasswordField(
order=4, order=5,
length=64, length=32,
label=_('Password'), label=_('Password'),
tooltip=_('Password of the user of Openshift Server'), tooltip=_('Password of the user of Openshift Server'),
required=True, required=True,
default='Tn5u8-9k9I9-6WF3Y-q5hSB',
) )
namespace = gui.TextField(
order=5,
length=64,
label=_('Namespace'),
tooltip=_('Openshift namespace to use (default: "default")'),
required=True,
default='default',
)
verify_ssl = fields.verify_ssl_field(order=6)
concurrent_creation_limit = fields.concurrent_creation_limit_field() concurrent_creation_limit = fields.concurrent_creation_limit_field()
concurrent_removal_limit = fields.concurrent_removal_limit_field() concurrent_removal_limit = fields.concurrent_removal_limit_field()
timeout = fields.timeout_field() timeout = fields.timeout_field()
_cached_api: typing.Optional['client.OpenshiftClient'] = None _cached_api: typing.Optional['client.OpenshiftClient'] = None
# There is more fields type, but not here the best place to cover it
def initialize(self, values: 'core_types.core.ValuesType') -> None: def initialize(self, values: 'core_types.core.ValuesType') -> None:
# No port validation needed, URLs are used if values:
pass self.port.value = validators.validate_port(self.port.value)
@property @property
def api(self) -> 'client.OpenshiftClient': def api(
self,
) -> 'client.OpenshiftClient':
if self._cached_api is None: if self._cached_api is None:
self._cached_api = client.OpenshiftClient( self._cached_api = client.OpenshiftClient(
cluster_url=self.cluster_url.value, host=self.host.value,
api_url=self.api_url.value, port=self.port.as_int(),
username=self.username.value, username=self.username.value,
password=self.password.value, password=self.password.value,
namespace=self.namespace.value or 'default',
cache=self.cache, cache=self.cache,
timeout=self.timeout.as_int(), timeout=self.timeout.as_int(),
verify_ssl=self.verify_ssl.as_bool(), verify_ssl=self.verify_ssl.as_bool(),
@@ -107,6 +100,23 @@ class OpenshiftProvider(ServiceProvider):
def test_connection(self) -> bool: def test_connection(self) -> bool:
return self.api.test() return self.api.test()
# def test_connection(self) -> bool:
# return self.api.test()
# def get_task_info(self, task_id: str) -> nu_types.TaskInfo:
# try:
# return self.api.get_task_info(task_id)
# except nu_exceptions.AcropolisConnectionError:
# raise
# except Exception as e:
# logger.error('Exception obtaining Openshift task info: %s', e)
# return nu_types.TaskInfo(
# state=nu_types.TaskState.UNKNOWN, error={'error': str(e)}, entities=[]
# )
# def get_macs_range(self) -> str:
# return self.macs_range.value
@cached('reachable', consts.cache.SHORT_CACHE_TIMEOUT) @cached('reachable', consts.cache.SHORT_CACHE_TIMEOUT)
def is_available(self) -> bool: def is_available(self) -> bool:
return self.api.test() return self.api.test()

View File

@@ -41,7 +41,6 @@ class OpenshiftTemplatePublication(DynamicPublication, autoserializable.AutoSeri
# We need to wait for the name te be created, but don't want to loose the _name # We need to wait for the name te be created, but don't want to loose the _name
self._waiting_name = True self._waiting_name = True
instance_info = self.service().api.get_instance_info(self.service().template.value) instance_info = self.service().api.get_instance_info(self.service().template.value)
logger.debug('Instance info for cloning: %s', instance_info)
if instance_info.status.is_cloning(): if instance_info.status.is_cloning():
self.retry_later() self.retry_later()
elif not instance_info.status.is_cloneable(): elif not instance_info.status.is_cloneable():

View File

Before

Width:  |  Height:  |  Size: 2.8 KiB

After

Width:  |  Height:  |  Size: 2.8 KiB

View File

@@ -97,31 +97,14 @@ class OpenshiftService(DynamicService): # pylint: disable=too-many-public-metho
self.basename.value = validators.validate_basename(self.basename.value, length=self.lenname.as_int()) self.basename.value = validators.validate_basename(self.basename.value, length=self.lenname.as_int())
def init_gui(self) -> None:
self.prov_uuid.value = self.provider().get_uuid()
vm_items = self.api.enumerate_vms()
choices = []
logger.debug('VMs found: %s', vm_items)
for vm in vm_items:
name = vm.get('metadata', {}).get('name', 'UNKNOWN')
namespace = vm.get('metadata', {}).get('namespace', '')
# Exclude templates whose name starts with 'UDS-'
if name.upper().startswith('UDS-'):
continue
# Show namespace if not default
label = f"{name} ({namespace})" if namespace and namespace != 'default' else name
choices.append(gui.choice_item(name, label))
self.template.set_choices(choices)
def init_gui(self) -> None: def init_gui(self) -> None:
self.prov_uuid.value = self.provider().get_uuid() self.prov_uuid.value = self.provider().get_uuid()
self.template.set_choices( self.template.set_choices(
[ [
gui.choice_item(str(template.metadata.uid), f'{template.metadata.name} ({template.metadata.namespace})') gui.choice_item(str(template.id), f'{template.name} ({template.tenant.name})')
for template in self.provider().api.list_vms() for template in self.provider().api.list_instances()
if not template.metadata.name.startswith('UDS-') if template.is_usable() and not template.name.startswith('UDS-')
] ]
) )
@@ -204,7 +187,7 @@ class OpenshiftService(DynamicService): # pylint: disable=too-many-public-metho
self, caller_instance: typing.Optional['DynamicUserService | DynamicPublication'], vmid: str self, caller_instance: typing.Optional['DynamicUserService | DynamicPublication'], vmid: str
) -> None: ) -> None:
""" """
Shutdowns the machine, same as stop (both tries soft shutdown, it's a openshift thing) Shutdowns the machine, same as stop (both tries soft shutdown, it's a Openshift thing)
""" """
self.api.stop_instance(vmid) self.api.stop_instance(vmid)

View File

@@ -42,7 +42,7 @@ from .deployment_fixed import OpenshiftUserServiceFixed
# Not imported at runtime, just for type checking # Not imported at runtime, just for type checking
if typing.TYPE_CHECKING: if typing.TYPE_CHECKING:
from .openshift import client from .Openshift import client
from .provider import OpenshiftProvider from .provider import OpenshiftProvider
from uds.core.services.generics.fixed.userservice import FixedUserService from uds.core.services.generics.fixed.userservice import FixedUserService
@@ -99,18 +99,18 @@ class OpenshiftServiceFixed(FixedService): # pylint: disable=too-many-public-me
# Uses default FixedService.initialize # Uses default FixedService.initialize
def init_gui(self) -> None: def init_gui(self) -> None:
self.prov_uuid.value = self.provider().get_uuid() self.prov_uuid.value = self.provider().get_uuid()
self.machines.set_choices( self.machines.set_choices(
[ [
gui.choice_item(str(machine.metadata.uid), f'{machine.metadata.name} ({machine.metadata.namespace})') gui.choice_item(str(template.id), f'{template.name} ({template.tenant.name})')
for machine in self.provider().api.list_vms() for template in self.provider().api.list_instances()
if not machine.metadata.name.startswith('UDS-') if template.is_usable() and not template.name.startswith('UDS-')
] ]
) )
def provider(self) -> 'OpenshiftProvider': def provider(self) -> 'OpenshiftProvider':
return typing.cast('OpenshiftProvider', super().provider()) return typing.cast('OpenshiftProvider', super().provider())
@@ -119,17 +119,9 @@ class OpenshiftServiceFixed(FixedService): # pylint: disable=too-many-public-me
def enumerate_assignables(self) -> collections.abc.Iterable[types.ui.ChoiceItem]: def enumerate_assignables(self) -> collections.abc.Iterable[types.ui.ChoiceItem]:
# Obtain machines names and ids for asignables # Obtain machines names and ids for asignables
servers = {} servers = {
vm_items = self.api.enumerate_instances() str(server.id): server.name for server in self.api.list_instances() if not server.name.startswith('UDS-') and server.is_usable()
for vm in vm_items: }
name = vm.get('metadata', {}).get('name', 'UNKNOWN')
namespace = vm.get('metadata', {}).get('namespace', '')
# Exclude templates whose name starts with 'UDS-'
if name.startswith('UDS-'):
continue
# Show namespace if not default
label = f"{name} ({namespace})" if namespace and namespace != 'default' else name
servers[name] = label
with self._assigned_access() as assigned_servers: with self._assigned_access() as assigned_servers:
return [ return [
@@ -184,7 +176,7 @@ class OpenshiftServiceFixed(FixedService): # pylint: disable=too-many-public-me
self.do_log(types.log.LogLevel.INFO, f'Stopping machine {vmid} after logout') self.do_log(types.log.LogLevel.INFO, f'Stopping machine {vmid} after logout')
def get_mac(self, vmid: str) -> str: def get_mac(self, vmid: str) -> str:
# No mac on openshift instances, return ip instead # No mac on Openshift instances, return ip instead
return self.api.get_instance_info(vmid).validate().interfaces[0].mac_address if vmid else '' return self.api.get_instance_info(vmid).validate().interfaces[0].mac_address if vmid else ''
def get_ip(self, vmid: str) -> str: def get_ip(self, vmid: str) -> str: