From 7566071270ba4471d43e5965829c225c33c2cab1 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Adolfo=20G=C3=B3mez=20Garc=C3=ADa?= Date: Wed, 17 Jul 2024 19:41:52 +0200 Subject: [PATCH] Cleaned and done openstack client tests --- .../src/tests/services/openstack/fixtures.py | 32 --- .../tests/services/openstack/test_client.py | 187 +++++++++++++++++- server/src/tests/utils/__init__.py | 8 +- .../services/OpenStack/openstack/client.py | 131 +++--------- .../uds/services/OpenStack/openstack/types.py | 48 +---- 5 files changed, 220 insertions(+), 186 deletions(-) diff --git a/server/src/tests/services/openstack/fixtures.py b/server/src/tests/services/openstack/fixtures.py index 6014ca6c6..8cde3d80f 100644 --- a/server/src/tests/services/openstack/fixtures.py +++ b/server/src/tests/services/openstack/fixtures.py @@ -185,25 +185,6 @@ DEF_NETWORKS_LIST: typing.Final[list[openstack_types.NetworkInfo]] = [ for n in range(1, 16) ] -DEF_PORTS_LIST: typing.Final[list[openstack_types.PortInfo]] = [ - openstack_types.PortInfo( - id=f'portid{n}', - name=f'port name{n}', - status=openstack_types.PortStatus.ACTIVE, - device_id=f'devid{n}', - device_owner=f'devowner{n}', - mac_address=f'fa:{n:02x}:3e:0d:{n+1:02x}:91', - fixed_ips=[ - openstack_types.PortInfo.FixedIpInfo( - ip_address=f'192.168.{j}.1', - subnet_id=random.choice([s.id for s in DEF_SUBNETS_LIST]), - ) - for j in range(1, 4) - ], - ) - for n in range(1, 16) -] - DEF_SECURITY_GROUPS_LIST: typing.Final[list[openstack_types.SecurityGroupInfo]] = [ openstack_types.SecurityGroupInfo( id=f'sgid{n}', @@ -239,7 +220,6 @@ VOLUMES_LIST = copy.deepcopy(DEF_VOLUMES_LIST) VOLUME_SNAPSHOTS_LIST = copy.deepcopy(DEF_VOLUME_SNAPSHOTS_LIST) SUBNETS_LIST = copy.deepcopy(DEF_SUBNETS_LIST) NETWORKS_LIST = copy.deepcopy(DEF_NETWORKS_LIST) -PORTS_LIST = copy.deepcopy(DEF_PORTS_LIST) SECURITY_GROUPS_LIST = copy.deepcopy(DEF_SECURITY_GROUPS_LIST) CONSOLE_CONNECTION_INFO = copy.deepcopy(DEF_CONSOLE_CONNECTION_INFO) @@ -258,7 +238,6 @@ def clear() -> None: VOLUME_SNAPSHOTS_LIST[:] = copy.deepcopy(DEF_VOLUME_SNAPSHOTS_LIST) SUBNETS_LIST[:] = copy.deepcopy(DEF_SUBNETS_LIST) NETWORKS_LIST[:] = copy.deepcopy(DEF_NETWORKS_LIST) - PORTS_LIST[:] = copy.deepcopy(DEF_PORTS_LIST) SECURITY_GROUPS_LIST[:] = copy.deepcopy(DEF_SECURITY_GROUPS_LIST) CONSOLE_CONNECTION_INFO = copy.deepcopy( # pyright: ignore[reportConstantRedefinition] DEF_CONSOLE_CONNECTION_INFO @@ -298,7 +277,6 @@ CLIENT_METHODS_INFO: typing.Final[list[AutoSpecMethodInfo]] = [ AutoSpecMethodInfo(client.OpenStackClient.list_servers, returns=SERVERS_LIST), AutoSpecMethodInfo(client.OpenStackClient.list_volumes, returns=VOLUMES_LIST), AutoSpecMethodInfo(client.OpenStackClient.list_networks, returns=NETWORKS_LIST), - AutoSpecMethodInfo(client.OpenStackClient.list_ports, returns=PORTS_LIST), AutoSpecMethodInfo(client.OpenStackClient.list_security_groups, returns=SECURITY_GROUPS_LIST), AutoSpecMethodInfo( client.OpenStackClient.get_server_info, @@ -315,21 +293,11 @@ CLIENT_METHODS_INFO: typing.Final[list[AutoSpecMethodInfo]] = [ returns=search_id, partial_args=(VOLUME_SNAPSHOTS_LIST,), ), # pyright: ignore - AutoSpecMethodInfo( - client.OpenStackClient.update_snapshot, - returns=search_id, - partial_args=(VOLUME_SNAPSHOTS_LIST,), - ), AutoSpecMethodInfo( client.OpenStackClient.create_snapshot, returns=random_element, partial_args=(VOLUME_SNAPSHOTS_LIST,), ), - AutoSpecMethodInfo( - client.OpenStackClient.create_volume_from_snapshot, - returns=random_element, - partial_args=(VOLUMES_LIST,), - ), AutoSpecMethodInfo( client.OpenStackClient.create_server_from_snapshot, returns=random_element, diff --git a/server/src/tests/services/openstack/test_client.py b/server/src/tests/services/openstack/test_client.py index 326c33d94..5eb2536eb 100644 --- a/server/src/tests/services/openstack/test_client.py +++ b/server/src/tests/services/openstack/test_client.py @@ -35,6 +35,13 @@ import contextlib import logging import typing +from uds.core.services.generics import exceptions as gen_exceptions + +from tests.utils import vars, helpers +from tests.utils import search_item_by_attr + +from tests.utils.test import UDSTransactionTestCase + from uds.services.OpenStack.openstack import ( types as openstack_types, client as openstack_client, @@ -58,6 +65,8 @@ class TestOpenStackClient(UDSTransactionTestCase): _regionid: str _flavorid: str _networkid: str + _subnetid: str + _security_group_name: str _availability_zone_id: str oclient: openstack_client.OpenStackClient @@ -91,6 +100,8 @@ class TestOpenStackClient(UDSTransactionTestCase): self._regionid = search_item_by_attr(self.oclient.list_regions(), 'name', v['region_name']).id self._flavorid = search_item_by_attr(self.oclient.list_flavors(), 'name', v['flavor_name']).id self._networkid = search_item_by_attr(self.oclient.list_networks(), 'name', v['network_name']).id + self._subnetid = search_item_by_attr(self.oclient.list_subnets(), 'name', v['subnet_name']).id + self._security_group_name = v['security_group_name'] self._availability_zone_id = search_item_by_attr( self.oclient.list_availability_zones(), 'name', v['availability_zone_name'] ).id @@ -165,6 +176,14 @@ class TestOpenStackClient(UDSTransactionTestCase): availability_zone=self._availability_zone_id, ) try: + # Wait for server to be running + helpers.waiter( + lambda: self.oclient.get_server_info(server.id, force=True).power_state.is_running(), + timeout=30, + msg='Timeout waiting for server to be running', + ) + # Reget server info to complete all data + server = self.oclient.get_server_info(server.id) yield server finally: self.oclient.delete_server(server.id) @@ -192,7 +211,6 @@ class TestOpenStackClient(UDSTransactionTestCase): (server2.id, server2.flavor), [(s.id, s.flavor) for s in servers], ) - def test_list_volumes(self) -> None: with self.create_test_volume() as volume: @@ -217,3 +235,170 @@ class TestOpenStackClient(UDSTransactionTestCase): self.get_client(use_project_id=False) with self.assertRaises(Exception): self.oclient.list_volumes() + + def test_list_availability_zones(self) -> None: + availability_zones = self.oclient.list_availability_zones() + self.assertGreaterEqual(len(availability_zones), 1) + self.assertIn(self._availability_zone_id, [az.id for az in availability_zones]) + + def test_list_flavors(self) -> None: + flavors = self.oclient.list_flavors() + self.assertGreaterEqual(len(flavors), 1) + self.assertIn(self._flavorid, [f.id for f in flavors]) + + def test_list_networks(self) -> None: + networks = self.oclient.list_networks() + self.assertGreaterEqual(len(networks), 1) + self.assertIn(self._networkid, [n.id for n in networks]) + + def test_list_subnets(self) -> None: + subnets = self.oclient.list_subnets() + self.assertGreaterEqual(len(subnets), 1) + self.assertIn(self._subnetid, [s.id for s in subnets]) + + def test_list_security_groups(self) -> None: + security_groups = self.oclient.list_security_groups() + self.assertGreaterEqual(len(security_groups), 1) + self.assertIn(self._security_group_name, [sg.name for sg in security_groups]) + + def test_get_server_info(self) -> None: + with self.create_test_server() as server: + server_info = self.oclient.get_server_info(server.id) + self.assertEqual(server.id, server_info.id) + self.assertEqual(server.name, server_info.name) + self.assertEqual(server.flavor, server_info.flavor) + + # Trying to get a non existing server should raise an exceptions.NotFoundException + with self.assertRaises(gen_exceptions.NotFoundError): + self.oclient.get_server_info('non-existing-server') + + def test_get_volume_info(self) -> None: + with self.create_test_volume() as volume: + volume_info = self.oclient.get_volume_info(volume.id) + self.assertEqual(volume.id, volume_info.id) + self.assertEqual(volume.name, volume_info.name) + self.assertEqual(volume.description, volume_info.description) + + # Trying to get a non existing volume should raise an exceptions.NotFoundException + with self.assertRaises(gen_exceptions.NotFoundError): + self.oclient.get_volume_info('non-existing-volume') + + def test_get_snapshot_info(self) -> None: + with self.create_test_volume() as volume: + with self.create_test_snapshot(volume) as snapshot: + snapshot_info = self.oclient.get_snapshot_info(snapshot.id) + self.assertEqual(snapshot.id, snapshot_info.id) + self.assertEqual(snapshot.name, snapshot_info.name) + + # Trying to get a non existing snapshot should raise an exceptions.NotFoundException + with self.assertRaises(gen_exceptions.NotFoundError): + self.oclient.get_snapshot_info('non-existing-snapshot') + + def test_create_snapshot(self) -> None: + # Note: create snapshot is used on test_create_server_from_snapshot + # and it's already tested with test_create_server_from_snapshot, so we just test the exceptions here + + # Trying to create a snapshot from a non existing volume should raise an exceptions.NotFoundException + with self.assertRaises(gen_exceptions.NotFoundError): + self.oclient.create_snapshot(volume_id='non-existing-volume', name='non-existing-snapshot') + + def test_create_server_from_snapshot(self) -> None: + with self.create_test_server() as server: + self.assertIsNotNone(server.id) + + # Trying to create a server from a non existing snapshot should raise an exceptions.NotFoundException + with self.assertRaises(gen_exceptions.NotFoundError): + self.oclient.create_server_from_snapshot( + snapshot_id='non-existing-snapshot', + name='non-existing-server', + flavor_id=self._flavorid, + network_id=self._networkid, + security_groups_names=[], + availability_zone=self._availability_zone_id, + ) + + def test_delete_server(self) -> None: + # delete_server is tested on test_create_server_from_snapshot and test_list_servers at least + # so we just test the exceptions here + with self.assertRaises(gen_exceptions.NotFoundError): + self.oclient.delete_server('non-existing-server') + + def test_delete_snapshot(self) -> None: + # delete_snapshot is tested on test_create_snapshot at least + # so we just test the exceptions here + with self.assertRaises(gen_exceptions.NotFoundError): + self.oclient.delete_snapshot('non-existing-snapshot') + + def test_operations_server(self) -> None: + with self.create_test_server() as server: + # Server is already running, first stop it + self.oclient.stop_server(server.id) + helpers.waiter( + lambda: self.oclient.get_server_info(server.id, force=True).power_state.is_stopped(), + timeout=30, + msg='Timeout waiting for server to be stopped', + ) + + self.oclient.start_server(server.id) + helpers.waiter( + lambda: self.oclient.get_server_info(server.id, force=True).power_state.is_running(), + timeout=30, + msg='Timeout waiting for server to be running', + ) + + self.oclient.reset_server(server.id) + helpers.waiter( + lambda: self.oclient.get_server_info(server.id, force=True).status.is_active(), + timeout=30, + msg='Timeout waiting for server to be running', + ) + + # Suspend + self.oclient.suspend_server(server.id) + helpers.waiter( + lambda: self.oclient.get_server_info(server.id, force=True).power_state.is_suspended(), + timeout=30, + msg='Timeout waiting for server to be suspended', + ) + + # Resume + self.oclient.resume_server(server.id) + helpers.waiter( + lambda: self.oclient.get_server_info(server.id, force=True).power_state.is_running(), + timeout=30, + msg='Timeout waiting for server to be running', + ) + + # Reboot + self.oclient.reboot_server(server.id) + helpers.waiter( + lambda: self.oclient.get_server_info(server.id, force=True).status.is_active(), + timeout=30, + msg='Timeout waiting for server to be running', + ) + + def test_operations_fail_server(self) -> None: + # Trying the operations on a non existing server should raise an exceptions.NotFoundException + with self.assertRaises(gen_exceptions.NotFoundError): + self.oclient.start_server('non-existing-server') + + with self.assertRaises(gen_exceptions.NotFoundError): + self.oclient.stop_server('non-existing-server') + + with self.assertRaises(gen_exceptions.NotFoundError): + self.oclient.reset_server('non-existing-server') + + with self.assertRaises(gen_exceptions.NotFoundError): + self.oclient.suspend_server('non-existing-server') + + with self.assertRaises(gen_exceptions.NotFoundError): + self.oclient.resume_server('non-existing-server') + + with self.assertRaises(gen_exceptions.NotFoundError): + self.oclient.reboot_server('non-existing-server') + + def test_test_connection(self) -> None: + self.assertTrue(self.oclient.test_connection()) + + def test_is_available(self) -> None: + self.assertTrue(self.oclient.is_available()) \ No newline at end of file diff --git a/server/src/tests/utils/__init__.py b/server/src/tests/utils/__init__.py index 272ef3f67..e131a92f0 100644 --- a/server/src/tests/utils/__init__.py +++ b/server/src/tests/utils/__init__.py @@ -165,7 +165,7 @@ class MustBeOfType: return self.__str__() -def search_item_by_attr(lst: list[T], attribute: str, value: typing.Any, **kwargs: typing.Any) -> T: +def search_item_by_attr(lst: collections.abc.Iterable[T], attribute: str, value: typing.Any, **kwargs: typing.Any) -> T: """ Returns an item from a list of items kwargs are not used, just to let use it as partial on fixtures @@ -176,7 +176,7 @@ def search_item_by_attr(lst: list[T], attribute: str, value: typing.Any, **kwarg raise ValueError(f'Item with {attribute}=="{value}" not found in list {str(lst)[:100]}') -def search_dict_by_attr(lst: list[V], attribute: str, value: typing.Any, **kwargs: typing.Any) -> V: +def search_dict_by_attr(lst: collections.abc.Iterable[V], attribute: str, value: typing.Any, **kwargs: typing.Any) -> V: """ Returns an item from a list of items kwargs are not used, just to let use it as partial on fixtures @@ -188,7 +188,7 @@ def search_dict_by_attr(lst: list[V], attribute: str, value: typing.Any, **kwarg def filter_list_by_attr( - lst: list[T], attribute: str, value: typing.Any, *, sorted_by: str = '', **kwargs: typing.Any + lst: collections.abc.Iterable[T], attribute: str, value: typing.Any, *, sorted_by: str = '', **kwargs: typing.Any ) -> list[T]: """ Returns a list of items from a list of items @@ -201,7 +201,7 @@ def filter_list_by_attr( def filter_list_by_attr_list( - lst: list[T], attribute: str, values: list[typing.Any], *, sorted_by: str = '', **kwargs: typing.Any + lst: collections.abc.Iterable[T], attribute: str, values: list[typing.Any], *, sorted_by: str = '', **kwargs: typing.Any ) -> list[T]: """ Returns a list of items from a list of items diff --git a/server/src/uds/services/OpenStack/openstack/client.py b/server/src/uds/services/OpenStack/openstack/client.py index 19577e963..13f09be5e 100644 --- a/server/src/uds/services/OpenStack/openstack/client.py +++ b/server/src/uds/services/OpenStack/openstack/client.py @@ -41,7 +41,7 @@ from django.utils.translation import gettext as _ from uds.core import consts -from uds.core.services.generics import exceptions +from uds.core.services.generics import exceptions as gen_exceptions from uds.core.util import security, cache, decorators from . import types as openstack_types @@ -256,13 +256,15 @@ class OpenStackClient: # pylint: disable=too-many-public-methods OpenStackClient._ensure_valid_response(r, error_message, expects_json=expects_json) logger.debug('Result: %s', r.content) return r - except exceptions.NotFoundError: + except gen_exceptions.NotFoundError: raise except Exception as e: if i == len(found_endpoints) - 1: # Endpoint is down, can retry if none is working if isinstance(e, (requests.exceptions.Timeout, requests.exceptions.ConnectionError)): - raise exceptions.RetryableError('All endpoints failed') from e # With last exception + raise gen_exceptions.RetryableError( + 'All endpoints failed' + ) from e # With last exception raise e logger.warning('Error requesting %s: %s', endpoint + path, e) self.cache.remove(cache_key) @@ -304,7 +306,9 @@ class OpenStackClient: # pylint: disable=too-many-public-methods if i == len(found_endpoints) - 1: # Endpoint is down, can retry if none is working if isinstance(e, (requests.exceptions.Timeout, requests.exceptions.ConnectionError)): - raise exceptions.RetryableError('All endpoints failed') from e # With last exception + raise gen_exceptions.RetryableError( + 'All endpoints failed' + ) from e # With last exception raise e logger.warning('Error requesting %s: %s (%s)', endpoint + path, e, error_message) self.cache.remove(cache_key) @@ -455,23 +459,6 @@ class OpenStackClient: # pylint: disable=too-many-public-methods ) ] - @decorators.cached(prefix='snps', timeout=consts.cache.SHORT_CACHE_TIMEOUT, key_helper=cache_key_helper) - def list_volume_snapshots( - self, volume_id: typing.Optional[str] = None - ) -> list[openstack_types.SnapshotInfo]: - path = '/snapshots' - if volume_id is not None: - path += f'?volume_id={volume_id}' - return [ - openstack_types.SnapshotInfo.from_dict(snapshot) - for snapshot in self._get_recurring_from_endpoint( - endpoint_types=VOLUMES_ENDPOINT_TYPES, - path=path, - error_message='List snapshots', - key='snapshots', - ) - ] - @decorators.cached(prefix='azs', timeout=consts.cache.EXTREME_CACHE_TIMEOUT, key_helper=cache_key_helper) def list_availability_zones(self) -> list[openstack_types.AvailabilityZoneInfo]: # Only available zones are returned @@ -523,7 +510,7 @@ class OpenStackClient: # pylint: disable=too-many-public-methods return res @decorators.cached(prefix='subns', timeout=consts.cache.LONG_CACHE_TIMEOUT, key_helper=cache_key_helper) - def list_subnets(self) -> collections.abc.Iterable[openstack_types.SubnetInfo]: + def list_subnets(self) -> list[openstack_types.SubnetInfo]: return [ openstack_types.SubnetInfo.from_dict(s) for s in self._get_recurring_from_endpoint( @@ -534,29 +521,6 @@ class OpenStackClient: # pylint: disable=too-many-public-methods ) ] - @decorators.cached(prefix='sgps', timeout=consts.cache.LONG_CACHE_TIMEOUT, key_helper=cache_key_helper) - def list_ports( - self, - network_id: typing.Optional[str] = None, - owner_id: typing.Optional[str] = None, - ) -> list[openstack_types.PortInfo]: - params: dict[str, typing.Any] = {} - if network_id is not None: - params['network_id'] = network_id - if owner_id is not None: - params['device_owner'] = owner_id - - return [ - openstack_types.PortInfo.from_dict(p) - for p in self._get_recurring_from_endpoint( - endpoint_types=NETWORKS_ENDPOINT_TYPES, - path='/v2.0/ports', - error_message='List ports', - key='ports', - params=params, - ) - ] - @decorators.cached(prefix='sgps', timeout=consts.cache.EXTREME_CACHE_TIMEOUT, key_helper=cache_key_helper) def list_security_groups(self) -> list[openstack_types.SecurityGroupInfo]: return [ @@ -573,7 +537,7 @@ class OpenStackClient: # pylint: disable=too-many-public-methods # Any cache time less than 5 seconds will be fine, beceuse checks on # openstack are done every 5 seconds @decorators.cached(prefix='svr', timeout=consts.cache.SHORTEST_CACHE_TIMEOUT, key_helper=cache_key_helper) - def get_server_info(self, server_id: str) -> openstack_types.ServerInfo: + def get_server_info(self, server_id: str, **kwargs: typing.Any) -> openstack_types.ServerInfo: r = self._request_from_endpoint( 'get', endpoints_types=COMPUTE_ENDPOINT_TYPES, @@ -607,32 +571,12 @@ class OpenStackClient: # pylint: disable=too-many-public-methods return openstack_types.SnapshotInfo.from_dict(r.json()['snapshot']) - def update_snapshot( - self, - snapshot_id: str, - name: typing.Optional[str] = None, - description: typing.Optional[str] = None, - ) -> openstack_types.SnapshotInfo: - data: dict[str, typing.Any] = {'snapshot': {}} - if name: - data['snapshot']['name'] = name - - if description: - data['snapshot']['description'] = description - - r = self._request_from_endpoint( - 'put', - endpoints_types=VOLUMES_ENDPOINT_TYPES, - path=f'/snapshots/{snapshot_id}', - data=json.dumps(data), - error_message='Update Snaphost information', - ) - - return openstack_types.SnapshotInfo.from_dict(r.json()['snapshot']) - def create_snapshot( self, volume_id: str, name: str, description: typing.Optional[str] = None ) -> openstack_types.SnapshotInfo: + # First, get volume info to ensure it exists + self.get_volume_info(volume_id, force=True) + description = description or 'UDS Snapshot' data = { 'snapshot': { @@ -653,29 +597,6 @@ class OpenStackClient: # pylint: disable=too-many-public-methods return openstack_types.SnapshotInfo.from_dict(r.json()['snapshot']) - def create_volume_from_snapshot( - self, snapshot_id: str, name: str, description: typing.Optional[str] = None - ) -> openstack_types.VolumeInfo: - description = description or 'UDS Volume' - data = { - 'volume': { - 'name': name, - 'description': description, - # 'volume_type': volType, # This seems to be the volume type name, not the id - 'snapshot_id': snapshot_id, - } - } - - r = self._request_from_endpoint( - 'post', - endpoints_types=VOLUMES_ENDPOINT_TYPES, - path='/volumes', - data=json.dumps(data), - error_message='Create Volume from Snapshot', - ) - - return openstack_types.VolumeInfo.from_dict(r.json()['volume']) - def create_server_from_snapshot( self, snapshot_id: str, @@ -686,6 +607,9 @@ class OpenStackClient: # pylint: disable=too-many-public-methods security_groups_names: collections.abc.Iterable[str], count: int = 1, ) -> openstack_types.ServerInfo: + # Check snapshot exists + self.get_snapshot_info(snapshot_id) + data = { 'server': { 'name': name, @@ -782,9 +706,15 @@ class OpenStackClient: # pylint: disable=too-many-public-methods error_message='Rebooting server', expects_json=False, ) + except gen_exceptions.NotFoundError: + raise except Exception: pass + def reset_server(self, server_id: str) -> None: + # Does not need return value + return self.reboot_server(server_id, hard=True) + def suspend_server(self, server_id: str) -> None: # this does not returns anything self._request_from_endpoint( @@ -807,21 +737,6 @@ class OpenStackClient: # pylint: disable=too-many-public-methods expects_json=False, ) - def reset_server(self, server_id: str, hard: bool = True) -> None: - # Does not need return value - try: - type_reboot = 'HARD' if hard else 'SOFT' - self._request_from_endpoint( - 'post', - endpoints_types=COMPUTE_ENDPOINT_TYPES, - path=f'/servers/{server_id}/action', - data='{"reboot":{"type":"' + type_reboot + '"}}', - error_message='Resetting server', - expects_json=False, - ) - except Exception: - pass # Ignore error for reseting server - def test_connection(self) -> bool: # First, ensure requested api is supported # We need api version 3.2 or greater @@ -905,7 +820,7 @@ class OpenStackClient: # pylint: disable=too-many-public-methods ) -> None: if response.ok is False: if response.status_code == 404: - raise exceptions.NotFoundError('Not found') + raise gen_exceptions.NotFoundError('Not found') try: # Extract any key, in case of error is expected to have only one top key so this will work _, err = response.json().popitem() diff --git a/server/src/uds/services/OpenStack/openstack/types.py b/server/src/uds/services/OpenStack/openstack/types.py index f04467473..cb026c92d 100644 --- a/server/src/uds/services/OpenStack/openstack/types.py +++ b/server/src/uds/services/OpenStack/openstack/types.py @@ -95,6 +95,9 @@ class ServerStatus(enum.StrEnum): def is_paused(self) -> bool: return self in [ServerStatus.PAUSED, ServerStatus.SUSPENDED] + + def is_active(self) -> bool: + return self == ServerStatus.ACTIVE def is_running(self) -> bool: return self in [ @@ -130,6 +133,9 @@ class PowerState(enum.IntEnum): def is_paused(self) -> bool: return self == PowerState.PAUSED + + def is_suspended(self) -> bool: + return self == PowerState.SUSPENDED def is_running(self) -> bool: return self == PowerState.RUNNING @@ -302,10 +308,7 @@ class ServerInfo: except Exception: pass # Just ignore any error here # Try to get flavor, only on >= 2.47 - try: - flavor = d.get('flavor', {}).get('id', '') - except Exception: - flavor = '' + flavor = d.get('flavor', {}).get('id', '') return ServerInfo( id=d['id'], name=d.get('name', d['id']), # On create server, name is not returned, so use id @@ -511,43 +514,6 @@ class SubnetInfo: network_id=d['network_id'], ) - -@dataclasses.dataclass -class PortInfo: - - @dataclasses.dataclass - class FixedIpInfo: - ip_address: str - subnet_id: str - - @staticmethod - def from_dict(d: dict[str, typing.Any]) -> 'PortInfo.FixedIpInfo': - return PortInfo.FixedIpInfo( - ip_address=d['ip_address'], - subnet_id=d['subnet_id'], - ) - - id: str - name: str - status: str - device_id: str - device_owner: str - mac_address: str - fixed_ips: list['FixedIpInfo'] - - @staticmethod - def from_dict(d: dict[str, typing.Any]) -> 'PortInfo': - return PortInfo( - id=d['id'], - name=d['name'], - status=d['status'], - device_id=d['device_id'], - device_owner=d['device_owner'], - mac_address=d['mac_address'], - fixed_ips=[PortInfo.FixedIpInfo.from_dict(ip) for ip in d['fixed_ips']], - ) - - @dataclasses.dataclass class SecurityGroupInfo: id: str