1
0
mirror of https://github.com/dkmstr/openuds.git synced 2025-11-16 04:24:19 +03:00

refactor: Remove cloning functionality from OpenshiftClient and related tests

This commit is contained in:
Janier Rodríguez
2025-10-31 17:36:41 +01:00
parent daa8e3677e
commit 3bbbc9d5dd
12 changed files with 1504 additions and 143 deletions

View File

@@ -297,33 +297,28 @@ class OpenshiftClient:
"""
Monitor the clone process of a virtual machine.
"""
clone_url = f"{api_url}/apis/clone.kubevirt.io/v1alpha1/namespaces/{namespace}/virtualmachineclones/{clone_name}"
headers = {"Authorization": f"Bearer {self.get_token()}", "Accept": "application/json"}
path = f"/apis/clone.kubevirt.io/v1alpha1/namespaces/{namespace}/virtualmachineclones/{clone_name}"
logging.info("Monitoring clone process for '%s'...", clone_name)
while True:
try:
response = requests.get(clone_url, headers=headers, verify=False)
if response.status_code == 200:
clone_data = response.json()
status = clone_data.get('status', {})
phase = status.get('phase', 'Unknown')
logging.info("Phase: %s", phase)
for condition in status.get('conditions', []):
ctype = condition.get('type', '')
cstatus = condition.get('status', '')
cmsg = condition.get('message', '')
logging.info(" %s: %s - %s", ctype, cstatus, cmsg)
if phase == 'Succeeded':
logging.info("Clone '%s' completed successfully!", clone_name)
break
elif phase == 'Failed':
logging.error("Clone '%s' failed!", clone_name)
break
elif response.status_code == 404:
logging.warning("Clone resource '%s' not found. May have been cleaned up.", clone_name)
response = self.do_request('GET', path)
status = response.get('status', {})
phase = status.get('phase', 'Unknown')
logging.info("Phase: %s", phase)
for condition in status.get('conditions', []):
ctype = condition.get('type', '')
cstatus = condition.get('status', '')
cmsg = condition.get('message', '')
logging.info(" %s: %s - %s", ctype, cstatus, cmsg)
if phase == 'Succeeded':
logging.info("Clone '%s' completed successfully!", clone_name)
break
else:
logging.error("Error monitoring clone: %d", response.status_code)
elif phase == 'Failed':
logging.error("Clone '%s' failed!", clone_name)
break
except exceptions.OpenshiftNotFoundError:
logging.warning("Clone resource '%s' not found. May have been cleaned up.", clone_name)
break
except Exception as e:
logging.error("Monitoring exception: %s", e)
logging.info("Waiting %d seconds before next check...", polling_interval)
@@ -333,19 +328,16 @@ class OpenshiftClient:
"""
Returns the name of the PVC or DataVolume used by the VM.
"""
vm_url = f"{api_url}/apis/kubevirt.io/v1/namespaces/{namespace}/virtualmachines/{vm_name}"
headers = {"Authorization": f"Bearer {self.get_token()}", "Accept": "application/json"}
response = requests.get(vm_url, headers=headers, verify=False)
if response.status_code == 200:
vm_obj = response.json()
volumes = vm_obj.get("spec", {}).get("template", {}).get("spec", {}).get("volumes", [])
for vol in volumes:
pvc = vol.get("persistentVolumeClaim")
if pvc:
return pvc.get("claimName"), "pvc"
dv = vol.get("dataVolume")
if dv:
return dv.get("name"), "dv"
path = f"/apis/kubevirt.io/v1/namespaces/{namespace}/virtualmachines/{vm_name}"
response = self.do_request('GET', path)
volumes = response.get("spec", {}).get("template", {}).get("spec", {}).get("volumes", [])
for vol in volumes:
pvc = vol.get("persistentVolumeClaim")
if pvc:
return pvc.get("claimName"), "pvc"
dv = vol.get("dataVolume")
if dv:
return dv.get("name"), "dv"
raise Exception(f"No PVC or DataVolume found in VM {vm_name}")
def get_datavolume_phase(self, datavolume_name: str) -> str:
@@ -353,13 +345,10 @@ class OpenshiftClient:
Get the phase of a DataVolume.
Returns the phase as a string.
"""
url = f"{self.api_url}/apis/cdi.kubevirt.io/v1beta1/namespaces/{self.namespace}/datavolumes/{datavolume_name}"
headers = {'Authorization': f'Bearer {self.get_token()}', 'Accept': 'application/json'}
path = f"/apis/cdi.kubevirt.io/v1beta1/namespaces/{self.namespace}/datavolumes/{datavolume_name}"
try:
response = requests.get(url, headers=headers, verify=self._verify_ssl, timeout=self._timeout)
if response.status_code == 200:
dv = response.json()
return dv.get('status', {}).get('phase', '')
response = self.do_request('GET', path)
return response.get('status', {}).get('phase', '')
except Exception:
pass
return ''
@@ -369,17 +358,14 @@ class OpenshiftClient:
Get the size of a DataVolume.
Returns the size as a string.
"""
url = f"{api_url}/apis/cdi.kubevirt.io/v1beta1/namespaces/{namespace}/datavolumes/{dv_name}"
headers = {"Authorization": f"Bearer {self.get_token()}", "Accept": "application/json"}
response = requests.get(url, headers=headers, verify=False)
if response.status_code == 200:
dv = response.json()
size = dv.get("status", {}).get("amount", None)
if size:
return size
return (
dv.get("spec", {}).get("pvc", {}).get("resources", {}).get("requests", {}).get("storage") or ""
)
path = f"/apis/cdi.kubevirt.io/v1beta1/namespaces/{namespace}/datavolumes/{dv_name}"
response = self.do_request('GET', path)
size = response.get("status", {}).get("amount", None)
if size:
return size
return (
response.get("spec", {}).get("pvc", {}).get("resources", {}).get("requests", {}).get("storage") or ""
)
raise Exception(f"Could not get the size of DataVolume {dv_name}")
def get_pvc_size(self, api_url: str, namespace: str, pvc_name: str) -> str:
@@ -387,14 +373,11 @@ class OpenshiftClient:
Get the size of a PVC.
Returns the size as a string.
"""
url = f"{api_url}/api/v1/namespaces/{namespace}/persistentvolumeclaims/{pvc_name}"
headers = {"Authorization": f"Bearer {self.get_token()}", "Accept": "application/json"}
response = requests.get(url, headers=headers, verify=False)
if response.status_code == 200:
pvc = response.json()
capacity = pvc.get("status", {}).get("capacity", {}).get("storage")
if capacity:
return capacity
path = f"/api/v1/namespaces/{namespace}/persistentvolumeclaims/{pvc_name}"
response = self.do_request('GET', path)
capacity = response.get("status", {}).get("capacity", {}).get("storage")
if capacity:
return capacity
raise Exception(f"Could not get the size of PVC {pvc_name}")
def clone_pvc_with_datavolume(
@@ -410,12 +393,7 @@ class OpenshiftClient:
Clone a PVC using a DataVolume.
Returns True if the DataVolume was created successfully, else False.
"""
dv_url = f"{api_url}/apis/cdi.kubevirt.io/v1beta1/namespaces/{namespace}/datavolumes"
headers = {
"Authorization": f"Bearer {self.get_token()}",
"Accept": "application/json",
"Content-Type": "application/json",
}
path = f"/apis/cdi.kubevirt.io/v1beta1/namespaces/{namespace}/datavolumes"
body: dict[str, typing.Any] = {
"apiVersion": "cdi.kubevirt.io/v1beta1",
"kind": "DataVolume",
@@ -429,12 +407,13 @@ class OpenshiftClient:
},
},
}
response = requests.post(dv_url, headers=headers, json=body, verify=False)
if response.status_code == 201:
try:
self.do_request('POST', path, data=body)
logging.info(f"DataVolume '{cloned_pvc_name}' created successfully")
return True
logging.error(f"Failed to create DataVolume: {response.status_code} {response.text}")
return False
except Exception as e:
logging.error(f"Failed to create DataVolume: {e}")
return False
def create_vm_from_pvc(
self,
@@ -449,16 +428,13 @@ class OpenshiftClient:
Create a new VM from a cloned PVC using DataVolumeTemplates.
Returns True if the VM was created successfully, else False.
"""
original_vm_url = (
f"{api_url}/apis/kubevirt.io/v1/namespaces/{namespace}/virtualmachines/{source_vm_name}"
)
headers = {"Authorization": f"Bearer {self.get_token()}", "Accept": "application/json"}
resp = requests.get(original_vm_url, headers=headers, verify=False)
if resp.status_code != 200:
logging.error(f"Could not get source VM: {resp.status_code} {resp.text}")
path = f"/apis/kubevirt.io/v1/namespaces/{namespace}/virtualmachines/{source_vm_name}"
try:
vm_obj = self.do_request('GET', path)
except Exception as e:
logging.error(f"Could not get source VM: {e}")
return False
vm_obj = resp.json()
vm_obj['metadata']['name'] = new_vm_name
for k in ['resourceVersion', 'uid', 'selfLink']:
@@ -504,28 +480,27 @@ class OpenshiftClient:
logger.info(f"Creating VM '{new_vm_name}' from cloned PVC '{new_dv_name}'.")
logger.info(f"VM Object: {vm_obj}")
create_url = f"{api_url}/apis/kubevirt.io/v1/namespaces/{namespace}/virtualmachines"
headers["Content-Type"] = "application/json"
resp = requests.post(create_url, headers=headers, json=vm_obj, verify=False)
if resp.status_code == 201:
create_path = f"/apis/kubevirt.io/v1/namespaces/{namespace}/virtualmachines"
try:
self.do_request('POST', create_path, data=vm_obj)
logging.info(f"VM '{new_vm_name}' created successfully with DataVolumeTemplate.")
return True
logging.error(f"Error creating VM: {resp.status_code} {resp.text}")
return False
except Exception as e:
logging.error(f"Error creating VM: {e}")
return False
def delete_vm(self, api_url: str, namespace: str, vm_name: str) -> bool:
"""
Delete a VM by name.
Returns True if the VM was deleted successfully, else False.
"""
url = f"{api_url}/apis/kubevirt.io/v1/namespaces/{namespace}/virtualmachines/{vm_name}"
headers = {"Authorization": f"Bearer {self.get_token()}", "Accept": "application/json"}
response = requests.delete(url, headers=headers, verify=False)
if response.status_code in [200, 202]:
path = f"/apis/kubevirt.io/v1/namespaces/{namespace}/virtualmachines/{vm_name}"
try:
self.do_request('DELETE', path)
logging.info(f"VM {vm_name} deleted successfully.")
return True
else:
logging.error(f"Error deleting VM {vm_name}: {response.status_code} - {response.text}")
except Exception as e:
logging.error(f"Error deleting VM {vm_name}: {e}")
return False
def wait_for_datavolume_clone_progress(
@@ -535,14 +510,12 @@ class OpenshiftClient:
Wait for a DataVolume clone to complete.
Returns True if the clone completed successfully, else False.
"""
url = f"{api_url}/apis/cdi.kubevirt.io/v1beta1/namespaces/{namespace}/datavolumes/{datavolume_name}"
headers = {"Authorization": f"Bearer {self.get_token()}", "Accept": "application/json"}
path = f"/apis/cdi.kubevirt.io/v1beta1/namespaces/{namespace}/datavolumes/{datavolume_name}"
start = time.time()
while time.time() - start < timeout:
response = requests.get(url, headers=headers, verify=False)
if response.status_code == 200:
dv = response.json()
status = dv.get('status', {})
try:
response = self.do_request('GET', path)
status = response.get('status', {})
phase = status.get('phase')
progress = status.get('progress', 'N/A')
logging.info(f"DataVolume {datavolume_name} status: {phase}, progress: {progress}")
@@ -552,8 +525,8 @@ class OpenshiftClient:
elif phase == 'Failed':
logging.error(f"DataVolume {datavolume_name} clone failed")
return False
else:
logging.error(f"Error querying DataVolume {datavolume_name}: {response.status_code}")
except Exception as e:
logging.error(f"Error querying DataVolume {datavolume_name}: {e}")
time.sleep(polling_interval)
logging.error(f"Timeout waiting for DataVolume {datavolume_name} clone")
return False
@@ -563,40 +536,47 @@ class OpenshiftClient:
Start a VM by name.
Returns True if the VM was started successfully, else False.
"""
url = f"{api_url}/apis/kubevirt.io/v1/namespaces/{namespace}/virtualmachines/{vm_name}"
headers = {
"Authorization": f"Bearer {self.get_token()}",
"Content-Type": "application/merge-patch+json",
"Accept": "application/json",
}
body: dict[str, typing.Any] = {"spec": {"runStrategy": "Always"}}
response = requests.patch(url, headers=headers, json=body, verify=False)
if response.status_code in [200, 201]:
logging.info(f"VM {vm_name} started.")
return True
else:
logging.info(f"Error starting VM {vm_name}: {response.status_code} - {response.text}")
# Get Vm info
path = f"/apis/kubevirt.io/v1/namespaces/{namespace}/virtualmachines/{vm_name}"
try:
vm_obj = self.do_request('GET', path)
except Exception as e:
logging.error(f"Could not get source VM: {e}")
return False
# Update runStrategy to Always
vm_obj['spec']['runStrategy'] = 'Always'
try:
self.do_request('PUT', path, data=vm_obj)
logging.info(f"VM {vm_name} will be started.")
return True
except Exception as e:
logging.info(f"Error starting VM {vm_name}: {e}")
return False
def stop_vm(self, api_url: str, namespace: str, vm_name: str) -> bool:
"""
Stop a VM by name.
Returns True if the VM was stopped successfully, else False.
"""
url = f"{api_url}/apis/kubevirt.io/v1/namespaces/{namespace}/virtualmachines/{vm_name}"
headers = {
"Authorization": f"Bearer {self.get_token()}",
"Content-Type": "application/merge-patch+json",
"Accept": "application/json",
}
body: dict[str, typing.Any] = {"spec": {"runStrategy": "Halted"}}
response = requests.patch(url, headers=headers, json=body, verify=False)
if response.status_code in [200, 201]:
# Get Vm info
path = f"/apis/kubevirt.io/v1/namespaces/{namespace}/virtualmachines/{vm_name}"
try:
vm_obj = self.do_request('GET', path)
except Exception as e:
logging.error(f"Could not get source VM: {e}")
return False
# Update runStrategy to Halted
vm_obj['spec']['runStrategy'] = 'Halted'
try:
self.do_request('PUT', path, data=vm_obj)
logging.info(f"VM {vm_name} will be stopped.")
return True
else:
logging.info(f"Error stopping VM {vm_name}: {response.status_code} - {response.text}")
return False
except Exception as e:
logging.info(f"Error starting VM {vm_name}: {e}")
return False
def copy_vm_same_size(
self, api_url: str, namespace: str, source_vm_name: str, new_vm_name: str, storage_class: str
@@ -688,22 +668,3 @@ class OpenshiftClient:
except Exception as e:
logging.error(f"Error deleting VM: {e}")
return False
def clone_vm_instance(self, source_vm_name: str, new_vm_name: str, storage_class: str) -> bool:
"""
Clone a VM by name, creating a new VM with the same size.
Returns True if clone succeeded, False otherwise.
"""
try:
self.copy_vm_same_size(self.api_url, self.namespace, source_vm_name, new_vm_name, storage_class)
return True
except Exception as e:
logging.error(f"Error cloning VM: {e}")
return False
@staticmethod
def validate_vm_id(vm_id: str | int) -> None:
try:
int(vm_id)
except ValueError:
raise exceptions.OpenshiftNotFoundError(f'VM {vm_id} not found')

View File

@@ -0,0 +1,315 @@
# -*- coding: utf-8 -*-
#
# Copyright (c) 2024 Virtual Cable S.L.U.
# All rights reserved.
#
# Redistribution and use in source and binary forms, with or without modification,
# are permitted provided that the following conditions are met:
#
# * Redistributions of source code must retain the above copyright notice,
# this list of conditions and the following disclaimer.
# * Redistributions in binary form must reproduce the above copyright notice,
# this list of conditions and the following disclaimer in the documentation
# and/or other materials provided with the distribution.
# * Neither the name of Virtual Cable S.L.U. nor the names of its contributors
# may be used to endorse or promote products derived from this software
# without specific prior written permission.
#
# THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS IS"
# AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE
# IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE ARE
# DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT HOLDER OR CONTRIBUTORS BE LIABLE
# FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR CONSEQUENTIAL
# DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR
# SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER
# CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY,
# OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE
# OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
"""
Author: Adolfo Gómez, dkmaster at dkmon dot com
"""
import contextlib
import copy
import functools
import random
import typing
from unittest import mock
import uuid
from uds.core import environment
from uds.core.ui.user_interface import gui
from uds.models.user import User
from unittest import mock
from uds.services.OpenShift import service, service_fixed, provider, publication, deployment, deployment_fixed
from uds.services.OpenShift.openshift import types as openshift_types, exceptions as openshift_exceptions
DEF_VMS: list[openshift_types.VM] = [
openshift_types.VM(
name=f'vm-{i}',
namespace='default',
uid=f'uid-{i}',
status=openshift_types.VMStatus.STOPPED if i % 2 == 0 else openshift_types.VMStatus.RUNNING,
volume_template=openshift_types.VolumeTemplate(name=f'volume-{i}', storage='10Gi'),
disks=[openshift_types.DeviceDisk(name=f'disk-{i}', boot_order=1)],
volumes=[openshift_types.Volume(name=f'volume-{i}', data_volume=f'dv-{i}')],
)
for i in range(1, 11)
]
DEF_VM_INSTANCES: list[openshift_types.VMInstance] = [
openshift_types.VMInstance(
name=f'vm-{i}',
namespace='default',
uid=f'uid-instance-{i}',
interfaces=[
openshift_types.Interface(
name='eth0',
mac_address=f'00:11:22:33:44:{i:02x}',
ip_address=f'192.168.1.{i}',
)
],
status=openshift_types.VMStatus.STOPPED if i % 2 == 0 else openshift_types.VMStatus.RUNNING,
phase=openshift_types.VMStatus.STOPPED if i % 2 == 0 else openshift_types.VMStatus.RUNNING,
)
for i in range(1, 11)
]
# clone values to avoid modifying the original ones
VMS: list[openshift_types.VM] = copy.deepcopy(DEF_VMS)
VM_INSTANCES: list[openshift_types.VMInstance] = copy.deepcopy(DEF_VM_INSTANCES)
def clear() -> None:
"""
Resets all values to the default ones
"""
VMS[:] = copy.deepcopy(DEF_VMS)
VM_INSTANCES[:] = copy.deepcopy(DEF_VM_INSTANCES)
def replace_vm_info(vm_name: str, **kwargs: typing.Any) -> None:
"""
Set the values of VMS by name
"""
try:
vm = next(vm for vm in VMS if vm.name == vm_name)
for k, v in kwargs.items():
setattr(vm, k, v)
except Exception:
raise openshift_exceptions.OpenshiftNotFoundError(f'VM {vm_name} not found')
def replacer_vm_info(**kwargs: typing.Any) -> typing.Callable[..., None]:
return functools.partial(replace_vm_info, **kwargs)
T = typing.TypeVar('T')
def returner(value: T, *args: typing.Any, **kwargs: typing.Any) -> typing.Callable[..., T]:
def inner(*args: typing.Any, **kwargs: typing.Any) -> T:
return value
return inner
# Provider values
PROVIDER_VALUES_DICT: gui.ValuesDictType = {
'cluster_url': 'https://oauth-openshift.apps-crc.testing',
'api_url': 'https://api.crc.testing:6443',
'username': 'kubeadmin',
'password': 'test-password',
'namespace': 'default',
'verify_ssl': False,
'concurrent_creation_limit': 1,
'concurrent_removal_limit': 1,
'timeout': 10,
}
# Service values
SERVICE_VALUES_DICT: gui.ValuesDictType = {
'template': VMS[0].name,
'basename': 'base',
'lenname': 4,
'publication_timeout': 120,
'prov_uuid': '',
}
# Service fixed values
SERVICE_FIXED_VALUES_DICT: gui.ValuesDictType = {
'token': '',
'machines': [VMS[2].name, VMS[3].name, VMS[4].name],
'on_logout': 'no',
'randomize': False,
'maintain_on_error': False,
'prov_uuid': '',
}
def create_client_mock() -> mock.Mock:
"""
Create a mock of OpenshiftClient
"""
client = mock.MagicMock()
vms = copy.deepcopy(DEF_VMS)
vm_instances = copy.deepcopy(DEF_VM_INSTANCES)
# Setup client methods
client.test.return_value = True
client.list_vms.return_value = vms
client.get_vm_info.return_value = lambda vm_name: next((vm for vm in vms if vm.name == vm_name), None) # type: ignore[arg-type]
client.get_vm_instance_info.return_value = lambda vm_name: next((vmi for vmi in vm_instances if vmi.name == vm_name), None) # type: ignore[arg-type]
client.start_vm_instance.return_value = True
client.stop_vm_instance.return_value = True
client.delete_vm_instance.return_value = True
client.get_datavolume_phase.return_value = 'Succeeded'
client.get_vm_pvc_or_dv_name.return_value = ('test-pvc', 'pvc')
client.get_pvc_size.return_value = '10Gi'
client.create_vm_from_pvc.return_value = True
client.wait_for_datavolume_clone_progress.return_value = True
return client
@contextlib.contextmanager
def patched_provider(
**kwargs: typing.Any,
) -> typing.Generator[provider.OpenshiftProvider, None, None]:
client = create_client_mock()
prov = create_provider(**kwargs)
prov._cached_api = client
yield prov
def create_provider(**kwargs: typing.Any) -> provider.OpenshiftProvider:
"""
Create a provider
"""
values = PROVIDER_VALUES_DICT.copy()
values.update(kwargs)
uuid_ = str(uuid.uuid4())
return provider.OpenshiftProvider(
environment=environment.Environment.private_environment(uuid_), values=values, uuid=uuid_
)
def create_service(
provider: typing.Optional[provider.OpenshiftProvider] = None, **kwargs: typing.Any
) -> service.OpenshiftService:
"""
Create a dynamic service
"""
uuid_ = str(uuid.uuid4())
values = SERVICE_VALUES_DICT.copy()
values.update(kwargs)
srvc = service.OpenshiftService(
environment=environment.Environment.private_environment(uuid_),
provider=provider or create_provider(),
values=values,
uuid=uuid_,
)
return srvc
def create_service_fixed(
provider: typing.Optional[provider.OpenshiftProvider] = None, **kwargs: typing.Any
) -> service_fixed.OpenshiftServiceFixed:
"""
Create a fixed service
"""
uuid_ = str(uuid.uuid4())
values = SERVICE_FIXED_VALUES_DICT.copy()
values.update(kwargs)
return service_fixed.OpenshiftServiceFixed(
environment=environment.Environment.private_environment(uuid_),
provider=provider or create_provider(),
values=values,
uuid=uuid_,
)
def create_publication(
service: typing.Optional[service.OpenshiftService] = None,
**kwargs: typing.Any,
) -> publication.OpenshiftTemplatePublication:
"""
Create a publication
"""
uuid_ = str(uuid.uuid4())
pub = publication.OpenshiftTemplatePublication(
environment=environment.Environment.private_environment(uuid_),
service=service or create_service(**kwargs),
revision=1,
servicepool_name='servicepool_name',
uuid=uuid_,
)
pub._name = f"pub-{random.randint(1000, 9999)}"
return pub
def create_userservice(
service: typing.Optional[service.OpenshiftService] = None,
publication: typing.Optional[publication.OpenshiftTemplatePublication] = None,
) -> deployment.OpenshiftUserService:
"""
Create a dynamic user service
"""
uuid_ = str(uuid.uuid4())
return deployment.OpenshiftUserService(
environment=environment.Environment.private_environment(uuid_),
service=service or create_service(),
publication=publication or create_publication(),
uuid=uuid_,
)
def create_userservice_fixed(
service: typing.Optional[service_fixed.OpenshiftServiceFixed] = None,
) -> deployment_fixed.OpenshiftUserServiceFixed:
"""
Create a fixed user service
"""
uuid_ = str(uuid.uuid4().hex)
return deployment_fixed.OpenshiftUserServiceFixed(
environment=environment.Environment.private_environment(uuid_),
service=service or create_service_fixed(),
publication=None,
uuid=uuid_,
)
def create_user(
name: str = "testuser",
real_name: str = "Test User",
is_admin: bool = False,
state: str = 'A',
password: str = 'password',
mfa_data: str = '',
staff_member: bool = False,
last_access: typing.Optional[str] = None,
parent: typing.Optional[User] = None,
created: typing.Optional[str] = None,
comments: str = '',
) -> User:
"""
Creates a mock User instance for testing purposes.
"""
user = mock.Mock(spec=User)
user.name = name
user.real_name = real_name
user.is_admin = is_admin
user.state = state
user.password = password
user.mfa_data = mfa_data
user.staff_member = staff_member
user.last_access = last_access
user.parent = parent
user.created = created
user.comments = comments
return user

View File

@@ -0,0 +1,143 @@
# -*- coding: utf-8 -*-
#
# Copyright (c) 2024 Virtual Cable S.L.U.
# All rights reserved.
#
# Redistribution and use in source and binary forms, with or without modification,
# are permitted provided that the following conditions are met:
#
# * Redistributions of source code must retain the above copyright notice,
# this list of conditions and the following disclaimer.
# * Redistributions in binary form must reproduce the above copyright notice,
# this list of conditions and the following disclaimer in the documentation
# and/or other materials provided with the distribution.
# * Neither the name of Virtual Cable S.L.U. nor the names of its contributors
# may be used to endorse or promote products derived from this software
# without specific prior written permission.
#
# THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS IS"
# AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE
# IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE ARE
# DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT HOLDER OR CONTRIBUTORS BE LIABLE
# FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR CONSEQUENTIAL
# DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR
# SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER
# CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY,
# OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE
# OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
"""
Author: Adolfo Gómez, dkmaster at dkmon dot com
"""
import logging
from uds.services.OpenShift.openshift import client as openshift_client
from tests.utils.test import UDSTransactionTestCase
from tests.utils import vars
logger = logging.getLogger(__name__)
class TestOpenshiftClient(UDSTransactionTestCase):
"""Tests for operations with OpenShiftClient."""
os_client: openshift_client.OpenshiftClient
test_vm: str = ''
test_pool: str = ''
test_storage: str = ''
def setUp(self) -> None:
v = vars.get_vars('openshift')
if not v:
self.skipTest('No OpenShift test variables found')
self.os_client = openshift_client.OpenshiftClient(
cluster_url=v['cluster_url'],
api_url=v['api_url'],
username=v['username'],
password=v['password'],
namespace=v['namespace'],
timeout=int(v['timeout']),
verify_ssl=v['verify_ssl'] == 'true',
)
self.test_vm = v.get('test_vm', '')
self.test_pool = v.get('test_pool', '')
self.test_storage = v.get('test_storage', '')
# --- Token/API Tests ---
def test_get_token(self) -> None:
token = self.os_client.get_token()
self.assertIsNotNone(token)
def test_get_api_url(self) -> None:
url = self.os_client.get_api_url('/test/path', ('param1', 'value1'))
self.assertIn('/test/path', url)
self.assertIn('param1=value1', url)
def test_get_api_url_invalid(self):
url = self.os_client.get_api_url('/invalid/path', ('param', 'value'))
self.assertIn('/invalid/path', url)
# --- VM Listing/Info Tests ---
def test_list_vms(self) -> None:
vms = self.os_client.list_vms()
self.assertIsInstance(vms, list)
if vms:
info = self.os_client.get_vm_info(vms[0].name)
self.assertIsNotNone(info)
def test_list_vms_and_check_fields(self):
vms = self.os_client.list_vms()
self.assertIsInstance(vms, list)
for vm in vms:
self.assertTrue(hasattr(vm, 'name'))
self.assertTrue(hasattr(vm, 'namespace'))
def test_get_vm_info(self):
if not self.test_vm:
self.skipTest('No test_vm specified')
info = self.os_client.get_vm_info(self.test_vm)
self.assertIsNotNone(info)
def test_get_vm_info_invalid(self):
info = self.os_client.get_vm_info('nonexistent-vm')
self.assertIsNone(info)
def test_get_vm_instance_info(self):
if not self.test_vm:
self.skipTest('No test_vm specified')
info = self.os_client.get_vm_instance_info(self.test_vm)
self.assertTrue(info is None or hasattr(info, 'name'))
def test_get_vm_instance_info_invalid(self):
info = self.os_client.get_vm_instance_info('nonexistent-vm')
self.assertIsNone(info)
# --- VM Lifecycle and Actions ---
def test_vm_lifecycle(self) -> None:
self.skipTest('Skip this test to avoid issues in shared environments')
if not self.test_vm:
self.skipTest('No test_vm specified in test-vars.ini')
self.assertTrue(self.os_client.start_vm_instance(self.test_vm))
self.assertTrue(self.os_client.stop_vm_instance(self.test_vm))
self.assertTrue(self.os_client.delete_vm_instance(self.test_vm))
def test_start_stop_suspend_resume_vm(self):
if not self.test_vm:
self.skipTest('No test_vm specified')
#self.assertTrue(self.os_client.start_vm_instance(self.test_vm))
self.assertTrue(self.os_client.stop_vm_instance(self.test_vm))
# Suspend/resume skipped if not supported
def test_delete_vm_invalid(self):
self.assertFalse(self.os_client.delete_vm_instance('nonexistent-vm'))
# --- DataVolume Tests ---
def test_datavolume_phase(self) -> None:
phase = self.os_client.get_datavolume_phase('test-dv')
self.assertIsInstance(phase, str)
def test_datavolume_phase_invalid(self):
phase = self.os_client.get_datavolume_phase('nonexistent-dv')
self.assertIsInstance(phase, str)

View File

@@ -0,0 +1,153 @@
# -*- coding: utf-8 -*-
#
# Copyright (c) 2024 Virtual Cable S.L.U.
# All rights reserved.
#
# Redistribution and use in source and binary forms, with or without modification,
# are permitted provided that the following conditions are met:
#
# * Redistributions of source code must retain the above copyright notice,
# this list of conditions and the following disclaimer.
# * Redistributions in binary form must reproduce the above copyright notice,
# this list of conditions and the following disclaimer in the documentation
# and/or other materials provided with the distribution.
# * Neither the name of Virtual Cable S.L.U. nor the names of its contributors
# may be used to endorse or promote products derived from this software
# without specific prior written permission.
#
# THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS IS"
# AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE
# IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE ARE
# DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT HOLDER OR CONTRIBUTORS BE LIABLE
# FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR CONSEQUENTIAL
# DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR
# SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER
# CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY,
# OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE
# OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
"""
Author: Adolfo Gómez, dkmaster at dkmon dot com
"""
import typing
from unittest import mock
from uds.core import types, ui, environment
from uds.services.OpenShift.provider import OpenshiftProvider
from . import fixtures
from tests.utils.test import UDSTransactionTestCase
class TestOpenshiftProvider(UDSTransactionTestCase):
def setUp(self) -> None:
super().setUp()
fixtures.clear()
def test_provider_data(self) -> None:
"""
Test the provider data
"""
provider = fixtures.create_provider()
self.assertEqual(provider.cluster_url.value, fixtures.PROVIDER_VALUES_DICT['cluster_url'])
self.assertEqual(provider.api_url.value, fixtures.PROVIDER_VALUES_DICT['api_url'])
self.assertEqual(provider.username.value, fixtures.PROVIDER_VALUES_DICT['username'])
self.assertEqual(provider.password.value, fixtures.PROVIDER_VALUES_DICT['password'])
self.assertEqual(provider.namespace.value, fixtures.PROVIDER_VALUES_DICT['namespace'])
self.assertEqual(provider.verify_ssl.value, fixtures.PROVIDER_VALUES_DICT['verify_ssl'])
if not isinstance(provider.concurrent_creation_limit, ui.gui.NumericField):
self.fail('concurrent_creation_limit is not a NumericField')
self.assertEqual(
provider.concurrent_creation_limit.as_int(),
fixtures.PROVIDER_VALUES_DICT['concurrent_creation_limit'],
)
if not isinstance(provider.concurrent_removal_limit, ui.gui.NumericField):
self.fail('concurrent_removal_limit is not a NumericField')
self.assertEqual(
provider.concurrent_removal_limit.as_int(),
fixtures.PROVIDER_VALUES_DICT['concurrent_removal_limit'],
)
self.assertEqual(provider.timeout.as_int(), fixtures.PROVIDER_VALUES_DICT['timeout'])
def test_provider_test(self) -> None:
"""
Test the provider test method
"""
with fixtures.patched_provider() as provider:
api = typing.cast(mock.MagicMock, provider.api)
for ret_val in [True, False]:
api.test.reset_mock()
api.test.return_value = ret_val
result = OpenshiftProvider.test(
environment.Environment.temporary_environment(), fixtures.PROVIDER_VALUES_DICT
)
self.assertIsInstance(result, types.core.TestResult)
self.assertEqual(result.success, ret_val)
self.assertIsInstance(result.error, str)
def test_provider_is_available(self) -> None:
"""
Test the provider is_available method
"""
with fixtures.patched_provider() as provider:
api = typing.cast(mock.MagicMock, provider.api)
# First, true result
self.assertEqual(provider.is_available(), True)
api.test.assert_called_once_with()
api.test.reset_mock()
# Now, even if set test to false, should return true due to cache
api.test.return_value = False
self.assertEqual(provider.is_available(), True)
api.test.assert_not_called()
# clear cache of method
provider.is_available.cache_clear() # type: ignore
self.assertEqual(provider.is_available(), False)
api.test.assert_called_once_with()
def test_provider_api_methods(self) -> None:
"""
Test provider API methods
"""
with fixtures.patched_provider() as provider:
api = typing.cast(mock.MagicMock, provider.api)
self.assertEqual(provider.test_connection(), True)
api.test.assert_called_once_with()
self.assertEqual(provider.api.list_vms(), fixtures.VMS)
self.assertEqual(provider.api.get_vm_info('vm-1'), fixtures.VMS[0])
self.assertEqual(provider.api.get_vm_instance_info('vm-1'), fixtures.VM_INSTANCES[0])
self.assertTrue(provider.api.start_vm_instance('vm-1'))
self.assertTrue(provider.api.stop_vm_instance('vm-1'))
self.assertTrue(provider.api.delete_vm_instance('vm-1'))
def test_sanitized_name(self) -> None:
"""
Test name sanitization
"""
provider = fixtures.create_provider()
test_cases = [
('Test-VM-1', 'test-vm-1'),
('Test_VM@2', 'test-vm-2'),
('My Test VM!!!', 'my-test-vm'),
('a' * 100, 'a' * 63), # Test truncation
]
for input_name, expected in test_cases:
self.assertEqual(provider.sanitized_name(input_name), expected)

View File

@@ -0,0 +1,147 @@
# -*- coding: utf-8 -*-
#
# Copyright (c) 2024 Virtual Cable S.L.U.
# All rights reserved.
#
# Redistribution and use in source and binary forms, with or without modification,
# are permitted provided that the following conditions are met:
#
# * Redistributions of source code must retain the above copyright notice,
# this list of conditions and the following disclaimer.
# * Redistributions in binary form must reproduce the above copyright notice,
# this list of conditions and the following disclaimer in the documentation
# and/or other materials provided with the distribution.
# * Neither the name of Virtual Cable S.L.U. nor the names of its contributors
# may be used to endorse or promote products derived from this software
# without specific prior written permission.
#
# THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS IS"
# AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE
# IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE ARE
# DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT HOLDER OR CONTRIBUTORS BE LIABLE
# FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR CONSEQUENTIAL
# DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR
# SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER
# CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY,
# OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE
# OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
"""
Author: Adolfo Gómez, dkmaster at dkmon dot com
"""
import typing
from unittest import mock
from uds.core import types
from tests.services.openshift import fixtures
from tests.utils.test import UDSTransactionTestCase
class TestOpenshiftPublication(UDSTransactionTestCase):
def setUp(self) -> None:
fixtures.clear()
def test_publication_creation(self) -> None:
"""
Test publication creation
"""
with fixtures.patched_provider() as provider:
api = typing.cast(mock.MagicMock, provider.api)
service = fixtures.create_service(provider=provider)
publication = fixtures.create_publication(service=service)
# Mock the publication process
api.get_vm_pvc_or_dv_name.return_value = ('test-pvc', 'pvc')
api.get_pvc_size.return_value = '10Gi'
api.create_vm_from_pvc.return_value = True
api.wait_for_datavolume_clone_progress.return_value = True
api.get_vm_info.return_value = fixtures.VMS[0]
state = publication.publish()
self.assertEqual(state, types.states.State.RUNNING)
# Check that publication process was initiated
api.get_vm_pvc_or_dv_name.assert_called()
api.get_pvc_size.assert_called()
api.create_vm_from_pvc.assert_called()
def test_publication_creation_checker(self) -> None:
"""
Test publication creation checker
"""
with fixtures.patched_provider() as provider:
service = fixtures.create_service(provider=provider)
publication = fixtures.create_publication(service=service)
# Ensure api is a mock so we can set return_value
api = typing.cast(mock.MagicMock, publication.service().api)
# Test when VM is not found yet
publication._waiting_name = True
api.get_vm_info.return_value = None
state = publication.op_create_checker()
self.assertEqual(state, types.states.TaskState.RUNNING)
# Test when VM is found
api.get_vm_info.return_value = fixtures.VMS[0]
state = publication.op_create_checker()
self.assertEqual(state, types.states.TaskState.FINISHED)
def test_publication_completed(self) -> None:
"""
Test publication completion
"""
with fixtures.patched_provider() as provider:
api = typing.cast(mock.MagicMock, provider.api)
service = fixtures.create_service(provider=provider)
publication = fixtures.create_publication(service=service)
# Test with running VM
running_vm = fixtures.VMS[0]
running_vm.status = fixtures.openshift_types.VMStatus.RUNNING
api.get_vm_info.return_value = running_vm
publication.op_create_completed()
api.stop_vm_instance.assert_called_with(publication._name)
# Test with stopped VM
stopped_vm = fixtures.VMS[0]
stopped_vm.status = fixtures.openshift_types.VMStatus.STOPPED
api.get_vm_info.return_value = stopped_vm
api.reset_mock()
publication.op_create_completed()
api.stop_vm_instance.assert_not_called()
def test_publication_destroy(self) -> None:
"""
Test publication destruction
"""
with fixtures.patched_provider() as provider:
api = typing.cast(mock.MagicMock, provider.api)
service = fixtures.create_service(provider=provider)
publication = fixtures.create_publication(service=service)
publication._name = 'test-vm'
state = publication.destroy()
self.assertEqual(state, types.states.State.RUNNING)
# Check state should call delete
state = publication.check_state()
self.assertEqual(state, types.states.State.RUNNING)
api.delete_vm_instance.assert_called_with('test-vm')
def test_get_template_id(self) -> None:
"""
Test template ID retrieval
"""
service = fixtures.create_service()
publication = fixtures.create_publication(service=service)
publication._name = 'test-template'
template_id = publication.get_template_id()
self.assertEqual(template_id, 'test-template')

View File

@@ -0,0 +1,95 @@
# -*- coding: utf-8 -*-
#
# Copyright (c) 2024 Virtual Cable S.L.U.
# All rights reserved.
#
# Redistribution and use in source and binary forms, with or without modification,
# are permitted provided that the following conditions are met:
#
# * Redistributions of source code must retain the above copyright notice,
# this list of conditions and the following disclaimer.
# * Redistributions in binary form must reproduce the above copyright notice,
# this list of conditions and the following disclaimer in the documentation
# and/or other materials provided with the distribution.
# * Neither the name of Virtual Cable S.L.U. nor the names of its contributors
# may be used to endorse or promote products derived from this software
# without specific prior written permission.
#
# THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS IS"
# AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE
# IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE ARE
# DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT HOLDER OR CONTRIBUTORS BE LIABLE
# FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR CONSEQUENTIAL
# DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR
# SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER
# CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY,
# OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE
# OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
"""
Author: Adolfo Gómez, dkmaster at dkmon dot com
"""
import pickle
from tests.services.openshift import fixtures
from tests.utils.test import UDSTransactionTestCase
class TestOpenshiftDeploymentSerialization(UDSTransactionTestCase):
def setUp(self) -> None:
fixtures.clear()
def test_userservice_serialization(self) -> None:
"""
Test user service serialization
"""
userservice = fixtures.create_userservice()
userservice._name = 'test-vm'
userservice._ip = '192.168.1.100'
userservice._mac = '00:11:22:33:44:55'
userservice._vmid = 'test-vm-id'
# Serialize and deserialize
data = pickle.dumps(userservice)
userservice2 = pickle.loads(data)
self.assertEqual(userservice2._name, 'test-vm')
self.assertEqual(userservice2._ip, '192.168.1.100')
self.assertEqual(userservice2._mac, '00:11:22:33:44:55')
self.assertEqual(userservice2._vmid, 'test-vm-id')
def test_userservice_methods_after_serialization(self) -> None:
"""
Test user service methods after serialization
"""
userservice = fixtures.create_userservice()
userservice._name = 'test-vm'
userservice._ip = '192.168.1.100'
userservice._mac = '00:11:22:33:44:55'
# Serialize and deserialize
data = pickle.dumps(userservice)
userservice2 = pickle.loads(data)
# Test methods after serialization
self.assertEqual(userservice2.get_name(), 'test-vm')
self.assertEqual(userservice2.get_ip(), '192.168.1.100')
self.assertEqual(userservice2.get_mac(), '00:11:22:33:44:55')
self.assertEqual(userservice2.get_unique_id(), 'test-vm')
def test_userservice_state_after_serialization(self) -> None:
"""
Test user service state after serialization
"""
userservice = fixtures.create_userservice()
userservice._name = 'test-vm'
userservice._reason = 'test-task'
# Serialize and deserialize
data = pickle.dumps(userservice)
userservice2 = pickle.loads(data)
self.assertEqual(userservice2._name, 'test-vm')
self.assertEqual(userservice2._reason, 'test-task')

View File

@@ -0,0 +1,72 @@
# -*- coding: utf-8 -*-
#
# Copyright (c) 2024 Virtual Cable S.L.U.
# All rights reserved.
#
# Redistribution and use in source and binary forms, with or without modification,
# are permitted provided that the following conditions are met:
#
# * Redistributions of source code must retain the above copyright notice,
# this list of conditions and the following disclaimer.
# * Redistributions in binary form must reproduce the above copyright notice,
# this list of conditions and the following disclaimer in the documentation
# and/or other materials provided with the distribution.
# * Neither the name of Virtual Cable S.L.U. nor the names of its contributors
# may be used to endorse or promote products derived from this software
# without specific prior written permission.
#
# THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS IS"
# AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE
# IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE ARE
# DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT HOLDER OR CONTRIBUTORS BE LIABLE
# FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR CONSEQUENTIAL
# DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR
# SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER
# CAUSED AND ANY THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY,
# OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE
# OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
"""
Author: Adolfo Gómez, dkmaster at dkmon dot com
"""
import pickle
from tests.services.openshift import fixtures
from tests.utils.test import UDSTransactionTestCase
class TestOpenshiftProviderSerialization(UDSTransactionTestCase):
def test_provider_serialization(self) -> None:
"""
Test provider serialization
"""
provider = fixtures.create_provider()
# Serialize and deserialize
data = pickle.dumps(provider)
provider2 = pickle.loads(data)
self.assertEqual(provider2.cluster_url.value, fixtures.PROVIDER_VALUES_DICT['cluster_url'])
self.assertEqual(provider2.api_url.value, fixtures.PROVIDER_VALUES_DICT['api_url'])
self.assertEqual(provider2.username.value, fixtures.PROVIDER_VALUES_DICT['username'])
self.assertEqual(provider2.password.value, fixtures.PROVIDER_VALUES_DICT['password'])
self.assertEqual(provider2.namespace.value, fixtures.PROVIDER_VALUES_DICT['namespace'])
self.assertEqual(provider2.verify_ssl.value, fixtures.PROVIDER_VALUES_DICT['verify_ssl'])
def test_provider_methods_after_serialization(self) -> None:
"""
Test provider methods after serialization
"""
provider = fixtures.create_provider()
# Serialize and deserialize
data = pickle.dumps(provider)
provider2 = pickle.loads(data)
# Test methods after serialization
self.assertEqual(provider2.get_name(), 'Openshift Provider')
self.assertEqual(provider2.get_description(), 'Openshift Provider')
self.assertEqual(provider2.get_cluster_url(), 'https://oauth-openshift.apps-crc.testing')
self.assertEqual(provider2.get_api_url(), 'https://api.crc.testing:6443')

View File

@@ -0,0 +1,74 @@
# -*- coding: utf-8 -*-
#
# Copyright (c) 2024 Virtual Cable S.L.U.
# All rights reserved.
#
# Redistribution and use in source and binary forms, with or without modification,
# are permitted provided that the following conditions are met:
#
# * Redistributions of source code must retain the above copyright notice,
# this list of conditions and the following disclaimer.
# * Redistributions in binary form must reproduce the above copyright notice,
# this list of conditions and the following disclaimer in the documentation
# and/or other materials provided with the distribution.
# * Neither the name of Virtual Cable S.L.U. nor the names of its contributors
# may be used to endorse or promote products derived from this software
# without specific prior written permission.
#
# THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS IS"
# AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE
# IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE ARE
# DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT HOLDER OR CONTRIBUTORS BE LIABLE
# FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR CONSEQUENTIAL
# DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR
# SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER
# CAUSED AND ANY THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY,
# OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE
# OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
"""
Author: Adolfo Gómez, dkmaster at dkmon dot com
"""
import pickle
from tests.services.openshift import fixtures
from tests.utils.test import UDSTransactionTestCase
class TestOpenshiftPublicationSerialization(UDSTransactionTestCase):
def setUp(self) -> None:
fixtures.clear()
def test_publication_serialization(self) -> None:
"""
Test publication serialization
"""
publication = fixtures.create_publication()
publication._name = 'test-template'
publication._reason = 'test-reason'
publication._waiting_name = True
# Serialize and deserialize
data = pickle.dumps(publication)
publication2 = pickle.loads(data)
self.assertEqual(publication2._name, 'test-template')
self.assertEqual(publication2._reason, 'test-reason')
self.assertEqual(publication2._waiting_name, True)
def test_publication_methods_after_serialization(self) -> None:
"""
Test publication methods after serialization
"""
publication = fixtures.create_publication()
publication._name = 'test-template'
# Serialize and deserialize
data = pickle.dumps(publication)
publication2 = pickle.loads(data)
# Test methods after serialization
self.assertEqual(publication2.get_name(), 'test-template')
self.assertEqual(publication2.get_template_id(), 'test-template')

View File

@@ -0,0 +1,143 @@
# -*- coding: utf-8 -*-
#
# Copyright (c) 2024 Virtual Cable S.L.U.
# All rights reserved.
#
# Redistribution and use in source and binary forms, with or without modification,
# are permitted provided that the following conditions are met:
#
# * Redistributions of source code must retain the above copyright notice,
# this list of conditions and the following disclaimer.
# * Redistributions in binary form must reproduce the above copyright notice,
# this list of conditions and the following disclaimer in the documentation
# and/or other materials provided with the distribution.
# * Neither the name of Virtual Cable S.L.U. nor the names of its contributors
# may be used to endorse or promote products derived from this software
# without specific prior written permission.
#
# THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS IS"
# AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE
# IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE ARE
# DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT HOLDER OR CONTRIBUTORS BE LIABLE
# FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR CONSEQUENTIAL
# DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR
# SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER
# CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY,
# OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE
# OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
"""
Author: Adolfo Gómez, dkmaster at dkmon dot com
"""
import typing
from unittest import mock
from tests.services.openshift import fixtures
from tests.utils.test import UDSTransactionTestCase
class TestOpenshiftService(UDSTransactionTestCase):
def setUp(self) -> None:
super().setUp()
fixtures.clear()
def test_service_data(self) -> None:
"""
Test service data
"""
service = fixtures.create_service()
self.assertEqual(service.template.value, fixtures.SERVICE_VALUES_DICT['template'])
self.assertEqual(service.basename.value, fixtures.SERVICE_VALUES_DICT['basename'])
self.assertEqual(service.lenname.value, fixtures.SERVICE_VALUES_DICT['lenname'])
self.assertEqual(service.publication_timeout.value, fixtures.SERVICE_VALUES_DICT['publication_timeout'])
def test_service_is_available(self) -> None:
"""
Test service availability
"""
with fixtures.patched_provider() as provider:
api = typing.cast(mock.MagicMock, provider.api)
service = fixtures.create_service(provider=provider)
self.assertTrue(service.is_available())
api.test.assert_called_with()
# Test with cached data
api.test.return_value = False
self.assertTrue(service.is_available())
# Clear cache and test again
service.provider().is_available.cache_clear() # type: ignore
self.assertFalse(service.is_available())
api.test.assert_called_with()
def test_service_methods(self) -> None:
"""
Test service methods
"""
with fixtures.patched_provider() as provider:
service = fixtures.create_service(provider=provider)
# Test basename and lenname
self.assertEqual(service.get_basename(), 'base')
self.assertEqual(service.get_lenname(), 4)
# Test sanitized name
sanitized = service.sanitized_name('Test VM 1')
self.assertEqual(sanitized, 'test-vm-1')
# Test find duplicates
duplicates = list(service.find_duplicates('vm-1', '00:11:22:33:44:55'))
self.assertEqual(len(duplicates), 1)
def test_vm_operations(self) -> None:
"""
Test VM operations
"""
with fixtures.patched_provider() as provider:
api = typing.cast(mock.MagicMock, provider.api)
service = fixtures.create_service(provider=provider)
# Test get IP
ip = service.get_ip(None, 'vm-1')
self.assertEqual(ip, '192.168.1.1')
# Test get MAC
mac = service.get_mac(None, 'vm-1')
self.assertEqual(mac, '00:11:22:33:44:01')
# Test is running
is_running = service.is_running(None, 'vm-1')
self.assertTrue(is_running)
# Test start/stop/shutdown
service.start(None, 'vm-1')
api.start_vm_instance.assert_called_with('vm-1')
service.stop(None, 'vm-1')
api.stop_vm_instance.assert_called_with('vm-1')
service.shutdown(None, 'vm-1')
api.stop_vm_instance.assert_called_with('vm-1')
def test_vm_deletion(self) -> None:
"""
Test VM deletion
"""
with fixtures.patched_provider() as provider:
api = typing.cast(mock.MagicMock, provider.api)
service = fixtures.create_service(provider=provider)
# Test execute delete
service.execute_delete('vm-1')
api.delete_vm_instance.assert_called_with('vm-1')
# Test is deleted
api.get_vm_info.return_value = None
self.assertTrue(service.is_deleted('vm-1'))
api.get_vm_info.return_value = fixtures.VMS[0]
self.assertFalse(service.is_deleted('vm-1'))

View File

@@ -0,0 +1,119 @@
# -*- coding: utf-8 -*-
#
# Copyright (c) 2024 Virtual Cable S.L.U.
# All rights reserved.
#
# Redistribution and use in source and binary forms, with or without modification,
# are permitted provided that the following conditions are met:
#
# * Redistributions of source code must retain the above copyright notice,
# this list of conditions and the following disclaimer.
# * Redistributions in binary form must reproduce the above copyright notice,
# this list of conditions and the following disclaimer in the documentation
# and/or other materials provided with the distribution.
# * Neither the name of Virtual Cable S.L.U. nor the names of its contributors
# may be used to endorse or promote products derived from this software
# without specific prior written permission.
#
# THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS IS"
# AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE
# IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE ARE
# DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT HOLDER OR CONTRIBUTORS BE LIABLE
# FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR CONSEQUENTIAL
# DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR
# SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER
# CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY,
# OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE
# OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
"""
Author: Adolfo Gómez, dkmaster at dkmon dot com
"""
import typing
from unittest import mock
from tests.services.openshift import fixtures
from tests.utils.test import UDSTransactionTestCase
class TestOpenshiftServiceFixed(UDSTransactionTestCase):
def setUp(self) -> None:
super().setUp()
fixtures.clear()
def test_service_fixed_data(self) -> None:
"""
Test fixed service data
"""
service = fixtures.create_service_fixed()
self.assertEqual(service.token.value, fixtures.SERVICE_FIXED_VALUES_DICT['token'])
self.assertEqual(service.machines.value, fixtures.SERVICE_FIXED_VALUES_DICT['machines'])
self.assertEqual(service.on_logout.value, fixtures.SERVICE_FIXED_VALUES_DICT['on_logout'])
self.assertEqual(service.randomize.value, fixtures.SERVICE_FIXED_VALUES_DICT['randomize'])
self.assertEqual(service.maintain_on_error.value, fixtures.SERVICE_FIXED_VALUES_DICT['maintain_on_error'])
def test_service_fixed_is_available(self) -> None:
"""
Test fixed service availability
"""
with fixtures.patched_provider() as provider:
api = typing.cast(mock.MagicMock, provider.api)
service = fixtures.create_service_fixed(provider=provider)
self.assertTrue(service.is_available())
api.test.assert_called_with()
# Test with cached data
api.test.return_value = False
self.assertTrue(service.is_available())
# Clear cache and test again
service.provider().is_available.cache_clear() # type: ignore
self.assertFalse(service.is_available())
api.test.assert_called_with()
def test_service_fixed_methods(self) -> None:
"""
Test fixed service methods
"""
with fixtures.patched_provider() as provider:
service = fixtures.create_service_fixed(provider=provider)
# Test get machines
machines = list(service.enumerate_assignables())
self.assertEqual(len(machines), 3)
self.assertEqual(machines[0], 'vm-3')
self.assertEqual(machines[1], 'vm-4')
self.assertEqual(machines[2], 'vm-5')
# Test get machine name
machine_name = service.get_name('vm-3')
self.assertEqual(machine_name, 'vm-3')
# Test sanitized name
sanitized = service.sanitized_name('Test VM 1')
self.assertEqual(sanitized, 'test-vm-1')
def test_service_fixed_assignment(self) -> None:
"""
Test fixed service assignment
"""
with fixtures.patched_provider() as provider:
service = fixtures.create_service_fixed(provider=provider)
# Test assign from empty
user = fixtures.create_user() # Create a valid User instance
userservice_instance = fixtures.create_userservice_fixed(service=service)
assigned: typing.Optional[str] = service.assign_from_assignables(assignable_id='', user=user, userservice_instance=userservice_instance)
self.assertEqual(assigned, 'vm-3')
# Test with existing assignments
assigned: typing.Optional[str] = service.assign_from_assignables(assignable_id='vm-3', user=user, userservice_instance=userservice_instance)
self.assertEqual(assigned, 'vm-4')
# Test with all assigned
assigned: typing.Optional[str] = service.assign_from_assignables(assignable_id='vm-3,vm-4,vm-5', user=user, userservice_instance=userservice_instance)
self.assertIsNone(assigned)

View File

@@ -0,0 +1,139 @@
# -*- coding: utf-8 -*-
#
# Copyright (c) 2024 Virtual Cable S.L.U.
# All rights reserved.
#
# Redistribution and use in source and binary forms, with or without modification,
# are permitted provided that the following conditions are met:
#
# * Redistributions of source code must retain the above copyright notice,
# this list of conditions and the following disclaimer.
# * Redistributions in binary form must reproduce the above copyright notice,
# this list of conditions and the following disclaimer in the documentation
# and/or other materials provided with the distribution.
# * Neither the name of Virtual Cable S.L.U. nor the names of its contributors
# may be used to endorse or promote products derived from this software
# without specific prior written permission.
#
# THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS IS"
# AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE
# IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE ARE
# DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT HOLDER OR CONTRIBUTORS BE LIABLE
# FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR CONSEQUENTIAL
# DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR
# SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER
# CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY,
# OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE
# OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
"""
Author: Adolfo Gómez, dkmaster at dkmon dot com
"""
import typing
from unittest import mock
from uds.core import types
from tests.services.openshift import fixtures
from tests.utils.test import UDSTransactionTestCase
class TestOpenshiftUserServiceFixed(UDSTransactionTestCase):
def setUp(self) -> None:
super().setUp()
fixtures.clear()
def test_userservice_fixed_initialization(self) -> None:
"""
Test fixed user service initialization
"""
userservice = fixtures.create_userservice_fixed()
userservice._name = 'vm-3'
self.assertEqual(userservice._name, 'vm-3')
self.assertEqual(userservice.service().type_type, types.services.ServiceType.VDI)
def test_userservice_fixed_lifecycle(self) -> None:
"""
Test fixed user service lifecycle
"""
with fixtures.patched_provider() as provider:
api = typing.cast(mock.MagicMock, provider.api)
service = fixtures.create_service_fixed(provider=provider)
userservice = fixtures.create_userservice_fixed(service=service)
userservice._name = 'vm-3'
# Test initial deployment
fake_user = fixtures.create_user()
state = userservice.deploy_for_user(fake_user)
self.assertEqual(state, types.states.State.RUNNING)
# Test check state when VM is running
api.get_vm_instance_info.return_value = fixtures.VM_INSTANCES[2] # vm-3 is running
state = userservice.check_state()
self.assertEqual(state, types.states.State.RUNNING)
# Test get IP and MAC
ip = userservice.get_ip()
self.assertEqual(ip, '192.168.1.3')
mac = userservice._mac
self.assertEqual(mac, '00:11:22:33:44:03')
def test_userservice_fixed_operations(self) -> None:
"""
Test fixed user service operations
"""
with fixtures.patched_provider() as provider:
api = typing.cast(mock.MagicMock, provider.api)
service = fixtures.create_service_fixed(provider=provider)
userservice = fixtures.create_userservice_fixed(service=service)
userservice._name = 'vm-3'
# Test start operation
userservice.op_start()
api.start_vm_instance.assert_called_with('vm-3')
# Test stop operation
userservice.op_stop()
api.stop_vm_instance.assert_called_with('vm-3')
# Test shutdown operation
userservice.op_shutdown()
api.stop_vm_instance.assert_called_with('vm-3')
def test_userservice_fixed_error_handling(self) -> None:
"""
Test fixed user service error handling
"""
with fixtures.patched_provider() as provider:
api = typing.cast(mock.MagicMock, provider.api)
service = fixtures.create_service_fixed(provider=provider)
userservice = fixtures.create_userservice_fixed(service=service)
userservice._name = 'vm-3'
# Test when VM is not found
api.get_vm_instance_info.return_value = None
state = userservice.check_state()
self.assertEqual(state, types.states.State.ERROR)
# Test error reason
reason = userservice.error_reason()
self.assertIn('not found', reason)
def test_userservice_fixed_serialization(self) -> None:
"""
Test fixed user service serialization
"""
userservice = fixtures.create_userservice_fixed()
userservice._name = 'vm-3'
userservice.set_ip('192.168.1.3')
userservice._mac = '00:11:22:33:44:03'
# Test get name
self.assertEqual(userservice.get_name(), 'vm-3')
# Test get unique id
self.assertEqual(userservice.get_unique_id(), 'vm-3')