1
0
mirror of https://github.com/dkmstr/openuds.git synced 2024-12-22 13:34:04 +03:00

Cleaned and done openstack client tests

This commit is contained in:
Adolfo Gómez García 2024-07-17 19:41:52 +02:00
parent 9d51963903
commit 7566071270
No known key found for this signature in database
GPG Key ID: DD1ABF20724CDA23
5 changed files with 220 additions and 186 deletions

View File

@ -185,25 +185,6 @@ DEF_NETWORKS_LIST: typing.Final[list[openstack_types.NetworkInfo]] = [
for n in range(1, 16)
]
DEF_PORTS_LIST: typing.Final[list[openstack_types.PortInfo]] = [
openstack_types.PortInfo(
id=f'portid{n}',
name=f'port name{n}',
status=openstack_types.PortStatus.ACTIVE,
device_id=f'devid{n}',
device_owner=f'devowner{n}',
mac_address=f'fa:{n:02x}:3e:0d:{n+1:02x}:91',
fixed_ips=[
openstack_types.PortInfo.FixedIpInfo(
ip_address=f'192.168.{j}.1',
subnet_id=random.choice([s.id for s in DEF_SUBNETS_LIST]),
)
for j in range(1, 4)
],
)
for n in range(1, 16)
]
DEF_SECURITY_GROUPS_LIST: typing.Final[list[openstack_types.SecurityGroupInfo]] = [
openstack_types.SecurityGroupInfo(
id=f'sgid{n}',
@ -239,7 +220,6 @@ VOLUMES_LIST = copy.deepcopy(DEF_VOLUMES_LIST)
VOLUME_SNAPSHOTS_LIST = copy.deepcopy(DEF_VOLUME_SNAPSHOTS_LIST)
SUBNETS_LIST = copy.deepcopy(DEF_SUBNETS_LIST)
NETWORKS_LIST = copy.deepcopy(DEF_NETWORKS_LIST)
PORTS_LIST = copy.deepcopy(DEF_PORTS_LIST)
SECURITY_GROUPS_LIST = copy.deepcopy(DEF_SECURITY_GROUPS_LIST)
CONSOLE_CONNECTION_INFO = copy.deepcopy(DEF_CONSOLE_CONNECTION_INFO)
@ -258,7 +238,6 @@ def clear() -> None:
VOLUME_SNAPSHOTS_LIST[:] = copy.deepcopy(DEF_VOLUME_SNAPSHOTS_LIST)
SUBNETS_LIST[:] = copy.deepcopy(DEF_SUBNETS_LIST)
NETWORKS_LIST[:] = copy.deepcopy(DEF_NETWORKS_LIST)
PORTS_LIST[:] = copy.deepcopy(DEF_PORTS_LIST)
SECURITY_GROUPS_LIST[:] = copy.deepcopy(DEF_SECURITY_GROUPS_LIST)
CONSOLE_CONNECTION_INFO = copy.deepcopy( # pyright: ignore[reportConstantRedefinition]
DEF_CONSOLE_CONNECTION_INFO
@ -298,7 +277,6 @@ CLIENT_METHODS_INFO: typing.Final[list[AutoSpecMethodInfo]] = [
AutoSpecMethodInfo(client.OpenStackClient.list_servers, returns=SERVERS_LIST),
AutoSpecMethodInfo(client.OpenStackClient.list_volumes, returns=VOLUMES_LIST),
AutoSpecMethodInfo(client.OpenStackClient.list_networks, returns=NETWORKS_LIST),
AutoSpecMethodInfo(client.OpenStackClient.list_ports, returns=PORTS_LIST),
AutoSpecMethodInfo(client.OpenStackClient.list_security_groups, returns=SECURITY_GROUPS_LIST),
AutoSpecMethodInfo(
client.OpenStackClient.get_server_info,
@ -315,21 +293,11 @@ CLIENT_METHODS_INFO: typing.Final[list[AutoSpecMethodInfo]] = [
returns=search_id,
partial_args=(VOLUME_SNAPSHOTS_LIST,),
), # pyright: ignore
AutoSpecMethodInfo(
client.OpenStackClient.update_snapshot,
returns=search_id,
partial_args=(VOLUME_SNAPSHOTS_LIST,),
),
AutoSpecMethodInfo(
client.OpenStackClient.create_snapshot,
returns=random_element,
partial_args=(VOLUME_SNAPSHOTS_LIST,),
),
AutoSpecMethodInfo(
client.OpenStackClient.create_volume_from_snapshot,
returns=random_element,
partial_args=(VOLUMES_LIST,),
),
AutoSpecMethodInfo(
client.OpenStackClient.create_server_from_snapshot,
returns=random_element,

View File

@ -35,6 +35,13 @@ import contextlib
import logging
import typing
from uds.core.services.generics import exceptions as gen_exceptions
from tests.utils import vars, helpers
from tests.utils import search_item_by_attr
from tests.utils.test import UDSTransactionTestCase
from uds.services.OpenStack.openstack import (
types as openstack_types,
client as openstack_client,
@ -58,6 +65,8 @@ class TestOpenStackClient(UDSTransactionTestCase):
_regionid: str
_flavorid: str
_networkid: str
_subnetid: str
_security_group_name: str
_availability_zone_id: str
oclient: openstack_client.OpenStackClient
@ -91,6 +100,8 @@ class TestOpenStackClient(UDSTransactionTestCase):
self._regionid = search_item_by_attr(self.oclient.list_regions(), 'name', v['region_name']).id
self._flavorid = search_item_by_attr(self.oclient.list_flavors(), 'name', v['flavor_name']).id
self._networkid = search_item_by_attr(self.oclient.list_networks(), 'name', v['network_name']).id
self._subnetid = search_item_by_attr(self.oclient.list_subnets(), 'name', v['subnet_name']).id
self._security_group_name = v['security_group_name']
self._availability_zone_id = search_item_by_attr(
self.oclient.list_availability_zones(), 'name', v['availability_zone_name']
).id
@ -165,6 +176,14 @@ class TestOpenStackClient(UDSTransactionTestCase):
availability_zone=self._availability_zone_id,
)
try:
# Wait for server to be running
helpers.waiter(
lambda: self.oclient.get_server_info(server.id, force=True).power_state.is_running(),
timeout=30,
msg='Timeout waiting for server to be running',
)
# Reget server info to complete all data
server = self.oclient.get_server_info(server.id)
yield server
finally:
self.oclient.delete_server(server.id)
@ -192,7 +211,6 @@ class TestOpenStackClient(UDSTransactionTestCase):
(server2.id, server2.flavor),
[(s.id, s.flavor) for s in servers],
)
def test_list_volumes(self) -> None:
with self.create_test_volume() as volume:
@ -217,3 +235,170 @@ class TestOpenStackClient(UDSTransactionTestCase):
self.get_client(use_project_id=False)
with self.assertRaises(Exception):
self.oclient.list_volumes()
def test_list_availability_zones(self) -> None:
availability_zones = self.oclient.list_availability_zones()
self.assertGreaterEqual(len(availability_zones), 1)
self.assertIn(self._availability_zone_id, [az.id for az in availability_zones])
def test_list_flavors(self) -> None:
flavors = self.oclient.list_flavors()
self.assertGreaterEqual(len(flavors), 1)
self.assertIn(self._flavorid, [f.id for f in flavors])
def test_list_networks(self) -> None:
networks = self.oclient.list_networks()
self.assertGreaterEqual(len(networks), 1)
self.assertIn(self._networkid, [n.id for n in networks])
def test_list_subnets(self) -> None:
subnets = self.oclient.list_subnets()
self.assertGreaterEqual(len(subnets), 1)
self.assertIn(self._subnetid, [s.id for s in subnets])
def test_list_security_groups(self) -> None:
security_groups = self.oclient.list_security_groups()
self.assertGreaterEqual(len(security_groups), 1)
self.assertIn(self._security_group_name, [sg.name for sg in security_groups])
def test_get_server_info(self) -> None:
with self.create_test_server() as server:
server_info = self.oclient.get_server_info(server.id)
self.assertEqual(server.id, server_info.id)
self.assertEqual(server.name, server_info.name)
self.assertEqual(server.flavor, server_info.flavor)
# Trying to get a non existing server should raise an exceptions.NotFoundException
with self.assertRaises(gen_exceptions.NotFoundError):
self.oclient.get_server_info('non-existing-server')
def test_get_volume_info(self) -> None:
with self.create_test_volume() as volume:
volume_info = self.oclient.get_volume_info(volume.id)
self.assertEqual(volume.id, volume_info.id)
self.assertEqual(volume.name, volume_info.name)
self.assertEqual(volume.description, volume_info.description)
# Trying to get a non existing volume should raise an exceptions.NotFoundException
with self.assertRaises(gen_exceptions.NotFoundError):
self.oclient.get_volume_info('non-existing-volume')
def test_get_snapshot_info(self) -> None:
with self.create_test_volume() as volume:
with self.create_test_snapshot(volume) as snapshot:
snapshot_info = self.oclient.get_snapshot_info(snapshot.id)
self.assertEqual(snapshot.id, snapshot_info.id)
self.assertEqual(snapshot.name, snapshot_info.name)
# Trying to get a non existing snapshot should raise an exceptions.NotFoundException
with self.assertRaises(gen_exceptions.NotFoundError):
self.oclient.get_snapshot_info('non-existing-snapshot')
def test_create_snapshot(self) -> None:
# Note: create snapshot is used on test_create_server_from_snapshot
# and it's already tested with test_create_server_from_snapshot, so we just test the exceptions here
# Trying to create a snapshot from a non existing volume should raise an exceptions.NotFoundException
with self.assertRaises(gen_exceptions.NotFoundError):
self.oclient.create_snapshot(volume_id='non-existing-volume', name='non-existing-snapshot')
def test_create_server_from_snapshot(self) -> None:
with self.create_test_server() as server:
self.assertIsNotNone(server.id)
# Trying to create a server from a non existing snapshot should raise an exceptions.NotFoundException
with self.assertRaises(gen_exceptions.NotFoundError):
self.oclient.create_server_from_snapshot(
snapshot_id='non-existing-snapshot',
name='non-existing-server',
flavor_id=self._flavorid,
network_id=self._networkid,
security_groups_names=[],
availability_zone=self._availability_zone_id,
)
def test_delete_server(self) -> None:
# delete_server is tested on test_create_server_from_snapshot and test_list_servers at least
# so we just test the exceptions here
with self.assertRaises(gen_exceptions.NotFoundError):
self.oclient.delete_server('non-existing-server')
def test_delete_snapshot(self) -> None:
# delete_snapshot is tested on test_create_snapshot at least
# so we just test the exceptions here
with self.assertRaises(gen_exceptions.NotFoundError):
self.oclient.delete_snapshot('non-existing-snapshot')
def test_operations_server(self) -> None:
with self.create_test_server() as server:
# Server is already running, first stop it
self.oclient.stop_server(server.id)
helpers.waiter(
lambda: self.oclient.get_server_info(server.id, force=True).power_state.is_stopped(),
timeout=30,
msg='Timeout waiting for server to be stopped',
)
self.oclient.start_server(server.id)
helpers.waiter(
lambda: self.oclient.get_server_info(server.id, force=True).power_state.is_running(),
timeout=30,
msg='Timeout waiting for server to be running',
)
self.oclient.reset_server(server.id)
helpers.waiter(
lambda: self.oclient.get_server_info(server.id, force=True).status.is_active(),
timeout=30,
msg='Timeout waiting for server to be running',
)
# Suspend
self.oclient.suspend_server(server.id)
helpers.waiter(
lambda: self.oclient.get_server_info(server.id, force=True).power_state.is_suspended(),
timeout=30,
msg='Timeout waiting for server to be suspended',
)
# Resume
self.oclient.resume_server(server.id)
helpers.waiter(
lambda: self.oclient.get_server_info(server.id, force=True).power_state.is_running(),
timeout=30,
msg='Timeout waiting for server to be running',
)
# Reboot
self.oclient.reboot_server(server.id)
helpers.waiter(
lambda: self.oclient.get_server_info(server.id, force=True).status.is_active(),
timeout=30,
msg='Timeout waiting for server to be running',
)
def test_operations_fail_server(self) -> None:
# Trying the operations on a non existing server should raise an exceptions.NotFoundException
with self.assertRaises(gen_exceptions.NotFoundError):
self.oclient.start_server('non-existing-server')
with self.assertRaises(gen_exceptions.NotFoundError):
self.oclient.stop_server('non-existing-server')
with self.assertRaises(gen_exceptions.NotFoundError):
self.oclient.reset_server('non-existing-server')
with self.assertRaises(gen_exceptions.NotFoundError):
self.oclient.suspend_server('non-existing-server')
with self.assertRaises(gen_exceptions.NotFoundError):
self.oclient.resume_server('non-existing-server')
with self.assertRaises(gen_exceptions.NotFoundError):
self.oclient.reboot_server('non-existing-server')
def test_test_connection(self) -> None:
self.assertTrue(self.oclient.test_connection())
def test_is_available(self) -> None:
self.assertTrue(self.oclient.is_available())

View File

@ -165,7 +165,7 @@ class MustBeOfType:
return self.__str__()
def search_item_by_attr(lst: list[T], attribute: str, value: typing.Any, **kwargs: typing.Any) -> T:
def search_item_by_attr(lst: collections.abc.Iterable[T], attribute: str, value: typing.Any, **kwargs: typing.Any) -> T:
"""
Returns an item from a list of items
kwargs are not used, just to let use it as partial on fixtures
@ -176,7 +176,7 @@ def search_item_by_attr(lst: list[T], attribute: str, value: typing.Any, **kwarg
raise ValueError(f'Item with {attribute}=="{value}" not found in list {str(lst)[:100]}')
def search_dict_by_attr(lst: list[V], attribute: str, value: typing.Any, **kwargs: typing.Any) -> V:
def search_dict_by_attr(lst: collections.abc.Iterable[V], attribute: str, value: typing.Any, **kwargs: typing.Any) -> V:
"""
Returns an item from a list of items
kwargs are not used, just to let use it as partial on fixtures
@ -188,7 +188,7 @@ def search_dict_by_attr(lst: list[V], attribute: str, value: typing.Any, **kwarg
def filter_list_by_attr(
lst: list[T], attribute: str, value: typing.Any, *, sorted_by: str = '', **kwargs: typing.Any
lst: collections.abc.Iterable[T], attribute: str, value: typing.Any, *, sorted_by: str = '', **kwargs: typing.Any
) -> list[T]:
"""
Returns a list of items from a list of items
@ -201,7 +201,7 @@ def filter_list_by_attr(
def filter_list_by_attr_list(
lst: list[T], attribute: str, values: list[typing.Any], *, sorted_by: str = '', **kwargs: typing.Any
lst: collections.abc.Iterable[T], attribute: str, values: list[typing.Any], *, sorted_by: str = '', **kwargs: typing.Any
) -> list[T]:
"""
Returns a list of items from a list of items

View File

@ -41,7 +41,7 @@ from django.utils.translation import gettext as _
from uds.core import consts
from uds.core.services.generics import exceptions
from uds.core.services.generics import exceptions as gen_exceptions
from uds.core.util import security, cache, decorators
from . import types as openstack_types
@ -256,13 +256,15 @@ class OpenStackClient: # pylint: disable=too-many-public-methods
OpenStackClient._ensure_valid_response(r, error_message, expects_json=expects_json)
logger.debug('Result: %s', r.content)
return r
except exceptions.NotFoundError:
except gen_exceptions.NotFoundError:
raise
except Exception as e:
if i == len(found_endpoints) - 1:
# Endpoint is down, can retry if none is working
if isinstance(e, (requests.exceptions.Timeout, requests.exceptions.ConnectionError)):
raise exceptions.RetryableError('All endpoints failed') from e # With last exception
raise gen_exceptions.RetryableError(
'All endpoints failed'
) from e # With last exception
raise e
logger.warning('Error requesting %s: %s', endpoint + path, e)
self.cache.remove(cache_key)
@ -304,7 +306,9 @@ class OpenStackClient: # pylint: disable=too-many-public-methods
if i == len(found_endpoints) - 1:
# Endpoint is down, can retry if none is working
if isinstance(e, (requests.exceptions.Timeout, requests.exceptions.ConnectionError)):
raise exceptions.RetryableError('All endpoints failed') from e # With last exception
raise gen_exceptions.RetryableError(
'All endpoints failed'
) from e # With last exception
raise e
logger.warning('Error requesting %s: %s (%s)', endpoint + path, e, error_message)
self.cache.remove(cache_key)
@ -455,23 +459,6 @@ class OpenStackClient: # pylint: disable=too-many-public-methods
)
]
@decorators.cached(prefix='snps', timeout=consts.cache.SHORT_CACHE_TIMEOUT, key_helper=cache_key_helper)
def list_volume_snapshots(
self, volume_id: typing.Optional[str] = None
) -> list[openstack_types.SnapshotInfo]:
path = '/snapshots'
if volume_id is not None:
path += f'?volume_id={volume_id}'
return [
openstack_types.SnapshotInfo.from_dict(snapshot)
for snapshot in self._get_recurring_from_endpoint(
endpoint_types=VOLUMES_ENDPOINT_TYPES,
path=path,
error_message='List snapshots',
key='snapshots',
)
]
@decorators.cached(prefix='azs', timeout=consts.cache.EXTREME_CACHE_TIMEOUT, key_helper=cache_key_helper)
def list_availability_zones(self) -> list[openstack_types.AvailabilityZoneInfo]:
# Only available zones are returned
@ -523,7 +510,7 @@ class OpenStackClient: # pylint: disable=too-many-public-methods
return res
@decorators.cached(prefix='subns', timeout=consts.cache.LONG_CACHE_TIMEOUT, key_helper=cache_key_helper)
def list_subnets(self) -> collections.abc.Iterable[openstack_types.SubnetInfo]:
def list_subnets(self) -> list[openstack_types.SubnetInfo]:
return [
openstack_types.SubnetInfo.from_dict(s)
for s in self._get_recurring_from_endpoint(
@ -534,29 +521,6 @@ class OpenStackClient: # pylint: disable=too-many-public-methods
)
]
@decorators.cached(prefix='sgps', timeout=consts.cache.LONG_CACHE_TIMEOUT, key_helper=cache_key_helper)
def list_ports(
self,
network_id: typing.Optional[str] = None,
owner_id: typing.Optional[str] = None,
) -> list[openstack_types.PortInfo]:
params: dict[str, typing.Any] = {}
if network_id is not None:
params['network_id'] = network_id
if owner_id is not None:
params['device_owner'] = owner_id
return [
openstack_types.PortInfo.from_dict(p)
for p in self._get_recurring_from_endpoint(
endpoint_types=NETWORKS_ENDPOINT_TYPES,
path='/v2.0/ports',
error_message='List ports',
key='ports',
params=params,
)
]
@decorators.cached(prefix='sgps', timeout=consts.cache.EXTREME_CACHE_TIMEOUT, key_helper=cache_key_helper)
def list_security_groups(self) -> list[openstack_types.SecurityGroupInfo]:
return [
@ -573,7 +537,7 @@ class OpenStackClient: # pylint: disable=too-many-public-methods
# Any cache time less than 5 seconds will be fine, beceuse checks on
# openstack are done every 5 seconds
@decorators.cached(prefix='svr', timeout=consts.cache.SHORTEST_CACHE_TIMEOUT, key_helper=cache_key_helper)
def get_server_info(self, server_id: str) -> openstack_types.ServerInfo:
def get_server_info(self, server_id: str, **kwargs: typing.Any) -> openstack_types.ServerInfo:
r = self._request_from_endpoint(
'get',
endpoints_types=COMPUTE_ENDPOINT_TYPES,
@ -607,32 +571,12 @@ class OpenStackClient: # pylint: disable=too-many-public-methods
return openstack_types.SnapshotInfo.from_dict(r.json()['snapshot'])
def update_snapshot(
self,
snapshot_id: str,
name: typing.Optional[str] = None,
description: typing.Optional[str] = None,
) -> openstack_types.SnapshotInfo:
data: dict[str, typing.Any] = {'snapshot': {}}
if name:
data['snapshot']['name'] = name
if description:
data['snapshot']['description'] = description
r = self._request_from_endpoint(
'put',
endpoints_types=VOLUMES_ENDPOINT_TYPES,
path=f'/snapshots/{snapshot_id}',
data=json.dumps(data),
error_message='Update Snaphost information',
)
return openstack_types.SnapshotInfo.from_dict(r.json()['snapshot'])
def create_snapshot(
self, volume_id: str, name: str, description: typing.Optional[str] = None
) -> openstack_types.SnapshotInfo:
# First, get volume info to ensure it exists
self.get_volume_info(volume_id, force=True)
description = description or 'UDS Snapshot'
data = {
'snapshot': {
@ -653,29 +597,6 @@ class OpenStackClient: # pylint: disable=too-many-public-methods
return openstack_types.SnapshotInfo.from_dict(r.json()['snapshot'])
def create_volume_from_snapshot(
self, snapshot_id: str, name: str, description: typing.Optional[str] = None
) -> openstack_types.VolumeInfo:
description = description or 'UDS Volume'
data = {
'volume': {
'name': name,
'description': description,
# 'volume_type': volType, # This seems to be the volume type name, not the id
'snapshot_id': snapshot_id,
}
}
r = self._request_from_endpoint(
'post',
endpoints_types=VOLUMES_ENDPOINT_TYPES,
path='/volumes',
data=json.dumps(data),
error_message='Create Volume from Snapshot',
)
return openstack_types.VolumeInfo.from_dict(r.json()['volume'])
def create_server_from_snapshot(
self,
snapshot_id: str,
@ -686,6 +607,9 @@ class OpenStackClient: # pylint: disable=too-many-public-methods
security_groups_names: collections.abc.Iterable[str],
count: int = 1,
) -> openstack_types.ServerInfo:
# Check snapshot exists
self.get_snapshot_info(snapshot_id)
data = {
'server': {
'name': name,
@ -782,9 +706,15 @@ class OpenStackClient: # pylint: disable=too-many-public-methods
error_message='Rebooting server',
expects_json=False,
)
except gen_exceptions.NotFoundError:
raise
except Exception:
pass
def reset_server(self, server_id: str) -> None:
# Does not need return value
return self.reboot_server(server_id, hard=True)
def suspend_server(self, server_id: str) -> None:
# this does not returns anything
self._request_from_endpoint(
@ -807,21 +737,6 @@ class OpenStackClient: # pylint: disable=too-many-public-methods
expects_json=False,
)
def reset_server(self, server_id: str, hard: bool = True) -> None:
# Does not need return value
try:
type_reboot = 'HARD' if hard else 'SOFT'
self._request_from_endpoint(
'post',
endpoints_types=COMPUTE_ENDPOINT_TYPES,
path=f'/servers/{server_id}/action',
data='{"reboot":{"type":"' + type_reboot + '"}}',
error_message='Resetting server',
expects_json=False,
)
except Exception:
pass # Ignore error for reseting server
def test_connection(self) -> bool:
# First, ensure requested api is supported
# We need api version 3.2 or greater
@ -905,7 +820,7 @@ class OpenStackClient: # pylint: disable=too-many-public-methods
) -> None:
if response.ok is False:
if response.status_code == 404:
raise exceptions.NotFoundError('Not found')
raise gen_exceptions.NotFoundError('Not found')
try:
# Extract any key, in case of error is expected to have only one top key so this will work
_, err = response.json().popitem()

View File

@ -95,6 +95,9 @@ class ServerStatus(enum.StrEnum):
def is_paused(self) -> bool:
return self in [ServerStatus.PAUSED, ServerStatus.SUSPENDED]
def is_active(self) -> bool:
return self == ServerStatus.ACTIVE
def is_running(self) -> bool:
return self in [
@ -130,6 +133,9 @@ class PowerState(enum.IntEnum):
def is_paused(self) -> bool:
return self == PowerState.PAUSED
def is_suspended(self) -> bool:
return self == PowerState.SUSPENDED
def is_running(self) -> bool:
return self == PowerState.RUNNING
@ -302,10 +308,7 @@ class ServerInfo:
except Exception:
pass # Just ignore any error here
# Try to get flavor, only on >= 2.47
try:
flavor = d.get('flavor', {}).get('id', '')
except Exception:
flavor = ''
flavor = d.get('flavor', {}).get('id', '')
return ServerInfo(
id=d['id'],
name=d.get('name', d['id']), # On create server, name is not returned, so use id
@ -511,43 +514,6 @@ class SubnetInfo:
network_id=d['network_id'],
)
@dataclasses.dataclass
class PortInfo:
@dataclasses.dataclass
class FixedIpInfo:
ip_address: str
subnet_id: str
@staticmethod
def from_dict(d: dict[str, typing.Any]) -> 'PortInfo.FixedIpInfo':
return PortInfo.FixedIpInfo(
ip_address=d['ip_address'],
subnet_id=d['subnet_id'],
)
id: str
name: str
status: str
device_id: str
device_owner: str
mac_address: str
fixed_ips: list['FixedIpInfo']
@staticmethod
def from_dict(d: dict[str, typing.Any]) -> 'PortInfo':
return PortInfo(
id=d['id'],
name=d['name'],
status=d['status'],
device_id=d['device_id'],
device_owner=d['device_owner'],
mac_address=d['mac_address'],
fixed_ips=[PortInfo.FixedIpInfo.from_dict(ip) for ip in d['fixed_ips']],
)
@dataclasses.dataclass
class SecurityGroupInfo:
id: str