From ac116697f4d6e7eed10a9a6e5f5278ac085dfdbf Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Adolfo=20G=C3=B3mez=20Garc=C3=ADa?= Date: Thu, 7 Mar 2024 23:20:00 +0100 Subject: [PATCH] Improving openstack, to allow more than one endpoint of a kind, and iterate over them in case of error --- server/src/uds/core/util/cache.py | 14 +- server/src/uds/services/OpenStack/helpers.py | 12 +- .../OpenStack/openstack/openstack_client.py | 859 ++++++++++++------ server/src/uds/services/OpenStack/provider.py | 2 +- server/src/uds/services/OpenStack/service.py | 8 +- 5 files changed, 593 insertions(+), 302 deletions(-) diff --git a/server/src/uds/core/util/cache.py b/server/src/uds/core/util/cache.py index b173139be..573a15f05 100644 --- a/server/src/uds/core/util/cache.py +++ b/server/src/uds/core/util/cache.py @@ -53,7 +53,6 @@ class Cache: misses = 0 _owner: str - _bowner: bytes @staticmethod def _basic_serialize(value: typing.Any) -> str: @@ -68,18 +67,17 @@ class Cache: def __init__(self, owner: typing.Union[str, bytes]): self._owner = owner.decode('utf-8') if isinstance(owner, bytes) else owner - self._bowner = self._owner.encode('utf8') - def __get_key(self, key: typing.Union[str, bytes]) -> str: + def _get_key(self, key: typing.Union[str, bytes]) -> str: if isinstance(key, str): key = key.encode('utf8') - return hash_key(self._bowner + key) + return hash_key(self._owner.encode() + key) def get(self, skey: typing.Union[str, bytes], default: typing.Any = None) -> typing.Any: now = sql_datetime() # logger.debug('Requesting key "%s" for cache "%s"', skey, self._owner) try: - key = self.__get_key(skey) + key = self._get_key(skey) # logger.debug('Key: %s', key) c: DBCache = DBCache.objects.get(owner=self._owner, pk=key) # If expired @@ -133,7 +131,7 @@ class Cache: """ # logger.debug('Removing key "%s" for uService "%s"' % (skey, self._owner)) try: - key = self.__get_key(skey) + key = self._get_key(skey) DBCache.objects.get(pk=key).delete() # @UndefinedVariable return True except DBCache.DoesNotExist: # @UndefinedVariable @@ -159,7 +157,7 @@ class Cache: # logger.debug('Saving key "%s" for cache "%s"' % (skey, self._owner,)) if validity is None: validity = consts.cache.DEFAULT_CACHE_TIMEOUT - key = self.__get_key(skey) + key = self._get_key(skey) strValue = Cache._serializer(value) now = sql_datetime() # Remove existing if any and create a new one @@ -200,7 +198,7 @@ class Cache: def refresh(self, skey: typing.Union[str, bytes]) -> None: # logger.debug('Refreshing key "%s" for cache "%s"' % (skey, self._owner,)) try: - key = self.__get_key(skey) + key = self._get_key(skey) c = DBCache.objects.get(pk=key) c.created = sql_datetime() c.save() diff --git a/server/src/uds/services/OpenStack/helpers.py b/server/src/uds/services/OpenStack/helpers.py index 27200a930..bb88fc7d3 100644 --- a/server/src/uds/services/OpenStack/helpers.py +++ b/server/src/uds/services/OpenStack/helpers.py @@ -71,17 +71,17 @@ def get_resources( ''' api, nameFromSubnets = getApi(parameters) - zones = [gui.choice_item(z, z) for z in api.listAvailabilityZones()] + zones = [gui.choice_item(z, z) for z in api.list_availability_zones()] networks = [ gui.choice_item(z['id'], z['name']) - for z in api.listNetworks(nameFromSubnets=nameFromSubnets) + for z in api.list_networks(nameFromSubnets=nameFromSubnets) ] - flavors = [gui.choice_item(z['id'], z['name']) for z in api.listFlavors()] + flavors = [gui.choice_item(z['id'], z['name']) for z in api.list_flavors()] securityGroups = [ - gui.choice_item(z['id'], z['name']) for z in api.listSecurityGroups() + gui.choice_item(z['id'], z['name']) for z in api.list_security_groups() ] volumeTypes = [gui.choice_item('-', _('None'))] + [ - gui.choice_item(t['id'], t['name']) for t in api.listVolumeTypes() + gui.choice_item(t['id'], t['name']) for t in api.list_volume_types() ] data: types.ui.CallbackResultType = [ @@ -105,7 +105,7 @@ def get_volumes( # Source volumes are all available for us # volumes = [gui.choice_item(v['id'], v['name']) for v in api.listVolumes() if v['name'] != '' and v['availability_zone'] == parameters['availabilityZone']] volumes = [ - gui.choice_item(v['id'], v['name']) for v in api.listVolumes() if v['name'] != '' + gui.choice_item(v['id'], v['name']) for v in api.list_volumes() if v['name'] != '' ] data: types.ui.CallbackResultType = [ diff --git a/server/src/uds/services/OpenStack/openstack/openstack_client.py b/server/src/uds/services/OpenStack/openstack/openstack_client.py index 6552b327f..5777659b5 100644 --- a/server/src/uds/services/OpenStack/openstack/openstack_client.py +++ b/server/src/uds/services/OpenStack/openstack/openstack_client.py @@ -18,34 +18,33 @@ # # THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS IS" # AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE -# IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE ARE +# IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PAdecorators.FTICULAR PURPOSE ARE # DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT HOLDER OR CONTRIBUTORS BE LIABLE # FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR CONSEQUENTIAL # DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR # SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER # CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, -# OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE +# OR TOdecorators.FT (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE # OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE. """ Author: Adolfo Gómez, dkmaster at dkmon dot com """ +import functools import logging import json import typing import collections.abc -# import dateutil.parser - from django.utils.translation import gettext as _ +from uds.core import consts -from uds.core.util import security +from uds.core.util import security, cache, decorators # Not imported at runtime, just for type checking if typing.TYPE_CHECKING: import requests - logger = logging.getLogger(__name__) # Required: Authentication v3 @@ -59,7 +58,7 @@ VERIFY_SSL = False # Helpers -def ensureResponseIsValid(response: 'requests.Response', errMsg: typing.Optional[str] = None) -> None: +def ensure_valid_response(response: 'requests.Response', errMsg: typing.Optional[str] = None) -> None: if response.ok is False: try: ( @@ -80,22 +79,23 @@ def ensureResponseIsValid(response: 'requests.Response', errMsg: typing.Optional raise Exception(errMsg) -def getRecurringUrlJson( - url: str, +def get_recurring_url_json( + endpoint: str, + path: str, session: 'requests.Session', headers: dict[str, str], key: str, params: typing.Optional[collections.abc.Mapping[str, str]] = None, - errMsg: typing.Optional[str] = None, + error_message: typing.Optional[str] = None, timeout: int = 10, ) -> collections.abc.Iterable[typing.Any]: counter = 0 while True: counter += 1 - logger.debug('Requesting url #%s: %s / %s', counter, url, params) - r = session.get(url, params=params, headers=headers, timeout=timeout) + logger.debug('Requesting url #%s: %s%s / %s', counter, endpoint, path, params) + r = session.get(endpoint + path, params=params, headers=headers, timeout=timeout) - ensureResponseIsValid(r, errMsg) + ensure_valid_response(r, error_message) j = r.json() @@ -105,33 +105,42 @@ def getRecurringUrlJson( if 'next' not in j: break - url = j['next'] + logger.debug('Json: %s', j) - -RT = typing.TypeVar('RT') + path = j['next'] # Decorators -def authRequired(func: collections.abc.Callable[..., RT]) -> collections.abc.Callable[..., RT]: - def ensurer(obj: 'Client', *args: typing.Any, **kwargs: typing.Any) -> RT: - obj.ensureAuthenticated() - try: +def auth_required(for_project: bool = False) -> collections.abc.Callable[[decorators.FT], decorators.FT]: + + def decorator(func: decorators.FT) -> decorators.FT: + @functools.wraps(func) + def wrapper(obj: 'Client', *args: typing.Any, **kwargs: typing.Any) -> typing.Any: + if for_project is True: + if obj._projectid is None: + raise Exception('Need a project for method {}'.format(func)) + obj.ensure_authenticated() return func(obj, *args, **kwargs) - except Exception as e: - logger.error('Got error %s for openstack', e) - raise - return ensurer + return typing.cast(decorators.FT, wrapper) + + return decorator -def auth_project_required(func: collections.abc.Callable[..., RT]) -> collections.abc.Callable[..., RT]: - def ensurer(obj: 'Client', *args: typing.Any, **kwargs: typing.Any) -> RT: - if obj._projectid is None: - raise Exception('Need a project for method {}'.format(func)) - obj.ensureAuthenticated() - return func(obj, *args, **kwargs) - - return ensurer +def cache_key_helper(obj: 'Client', *args: typing.Any, **kwargs: typing.Any) -> str: + return '_'.join( + [ + obj._authurl, + obj._domain, + obj._username, + obj._password, + str(obj._projectid), + str(obj._region), + str(obj._access), + str(args), + str(kwargs), + ] + ) class Client: # pylint: disable=too-many-public-methods @@ -157,6 +166,9 @@ class Client: # pylint: disable=too-many-public-methods _timeout: int _session: 'requests.Session' + # Cache for data + cache: 'cache.Cache' + # Legacyversion is True for versions <= Ocata def __init__( self, @@ -198,14 +210,18 @@ class Client: # pylint: disable=too-many-public-methods if self._authurl[-1] != '/': self._authurl += '/' - def _get_endpoint_for(self, *type_: str) -> str: # If no region is indicatad, first endpoint is returned - def inner_get(for_type: str) -> str: + self.cache = cache.Cache(f'openstack_{host}_{port}_{domain}_{username}_{projectid}_{region}') + + def _get_endpoints_for(self, *endpoint_types: str) -> collections.abc.Generator[str, None, None]: + def inner_get(for_type: str) -> collections.abc.Generator[str, None, None]: if not self._catalog: raise Exception('No catalog for endpoints') + # Filter by type and interface for i in typing.cast( list[dict[str, typing.Any]], filter(lambda v: v['type'] == for_type, self._catalog) ): + # Filter for interface accessiblity (public, ...) for j in typing.cast( list[dict[str, typing.Any]], filter( @@ -213,26 +229,117 @@ class Client: # pylint: disable=too-many-public-methods typing.cast(list[dict[str, typing.Any]], i['endpoints']), ), ): + # Filter for region if present if not self._region or j['region'] == self._region: - if 'myhuaweicloud.eu/V1.0' not in j['url']: - return j['url'] - raise Exception('No endpoint url found') + # if 'myhuaweicloud.eu/V1.0' not in j['url']: + yield j['url'] - for t in type_: + for t in endpoint_types: try: - return inner_get(t) + yield from inner_get(t) except Exception: pass - raise Exception('No endpoint url found') - def _requestHeaders(self) -> dict[str, str]: + def _get_endpoint_for( + self, *endpoint_type: str + ) -> str: # If no region is indicatad, first endpoint is returned + try: + return next(self._get_endpoints_for(*endpoint_type)) + except StopIteration: + raise Exception('No endpoint url found') + + def _get_request_headers(self) -> dict[str, str]: headers = {'content-type': 'application/json'} if self._tokenid: headers['X-Auth-Token'] = self._tokenid return headers - def authPassword(self) -> None: + def _get_compute_endpoint(self) -> str: + return self._get_endpoint_for('compute', 'compute_legacy') + + def _get_endpoints_iterable(self, cache_key: str, *type_: str) -> list[str]: + # If endpoint is cached, use it as first endpoint + found_endpoints = list(self._get_endpoints_for(*type_)) + if self.cache.get(cache_key) in found_endpoints: + # If cached endpoint is in the list, use it as first endpoint + found_endpoints = [self.cache.get(cache_key)] + list( + set(found_endpoints) - {self.cache.get(cache_key)} + ) + + logger.debug('Endpoints for %s: %s', type_, found_endpoints) + + return found_endpoints + + @auth_required(for_project=True) + def _request_from_endpoint( + self, + type: typing.Literal['get', 'put', 'post', 'delete'], + endpoints_types: list[str], + path: str, + error_message: str, + data: typing.Any = None, + ) -> typing.Any: + cache_key = ''.join(endpoints_types) + found_endpoints = self._get_endpoints_iterable(cache_key, *endpoints_types) + + for i, endpoint in enumerate(found_endpoints): + try: + r = self._session.request( + type, + endpoint + path, + data=data, + headers=self._get_request_headers(), + timeout=self._timeout, + ) + ensure_valid_response(r, error_message) + return r + except Exception as e: + if i == len(found_endpoints) - 1: + raise e + logger.warning('Error requesting %s: %s', endpoint + path, e) + self.cache.remove(cache_key) + continue + + @auth_required(for_project=True) + def _get_recurring_from_endpoint( + self, + endpoint_types: list[str], + path: str, + error_message: str, + key: str, + params: typing.Optional[dict[str, str]] = None, + ) -> collections.abc.Iterable[typing.Any]: + cache_key = ''.join(endpoint_types) + found_endpoints = self._get_endpoints_iterable(cache_key, *endpoint_types) + + logger.debug('Requesting from endpoints: %s and path %s', found_endpoints, path) + # Iterate request over all endpoints, until one works, and store it as cached running endpoint + for i, endpoint in enumerate(found_endpoints): + try: + # If fails, cached endpoint is removed and next one is tried + self.cache.put( + cache_key, endpoint, consts.cache.EXTREME_CACHE_TIMEOUT + ) # Cache endpoint for a very long time + yield from get_recurring_url_json( + endpoint=endpoint, + path=path, + session=self._session, + headers=self._get_request_headers(), + key=key, + params=params, + error_message=error_message, + timeout=self._timeout, + ) + return + except Exception as e: + # If last endpoint, raise exception + if i == len(found_endpoints) - 1: + raise e + logger.warning('Error requesting %s: %s (%s)', endpoint + path, e, error_message) + self.cache.remove(cache_key) + + def authenticate_with_password(self) -> None: # logger.debug('Authenticating...') data: dict[str, typing.Any] = { 'auth': { @@ -266,7 +373,7 @@ class Client: # pylint: disable=too-many-public-methods timeout=self._timeout, ) - ensureResponseIsValid(r, 'Invalid Credentials') + ensure_valid_response(r, 'Invalid Credentials') self._authenticated = True self._tokenid = r.headers['X-Subject-Token'] @@ -290,231 +397,341 @@ class Client: # pylint: disable=too-many-public-methods else: self._volume = 'volumev2' - def ensureAuthenticated(self) -> None: + def ensure_authenticated(self) -> None: if self._authenticated is False or self._projectid != self._authenticatedProjectId: - self.authPassword() + self.authenticate_with_password() - @authRequired - def listProjects(self) -> collections.abc.Iterable[typing.Any]: - return getRecurringUrlJson( - self._authurl + 'v3/users/{user_id}/projects'.format(user_id=self._userid), - self._session, - headers=self._requestHeaders(), - key='projects', - errMsg='List Projects', - timeout=self._timeout, + @auth_required() + def list_projects(self) -> list[typing.Any]: + return list( + get_recurring_url_json( + self._authurl, + 'v3/users/{user_id}/projects'.format(user_id=self._userid), + self._session, + headers=self._get_request_headers(), + key='projects', + error_message='List Projects', + timeout=self._timeout, + ) ) - @authRequired - def listRegions(self) -> collections.abc.Iterable[typing.Any]: - return getRecurringUrlJson( - self._authurl + 'v3/regions/', - self._session, - headers=self._requestHeaders(), - key='regions', - errMsg='List Regions', - timeout=self._timeout, + @auth_required() + def list_regions(self) -> list[typing.Any]: + return list( + get_recurring_url_json( + self._authurl, + 'v3/regions/', + self._session, + headers=self._get_request_headers(), + key='regions', + error_message='List Regions', + timeout=self._timeout, + ) ) - @auth_project_required - def listServers( + @decorators.cached(prefix='svrs', timeout=consts.cache.DEFAULT_CACHE_TIMEOUT, key_helper=cache_key_helper) + def list_servers( self, detail: bool = False, params: typing.Optional[dict[str, str]] = None, - ) -> collections.abc.Iterable[typing.Any]: - path = '/servers/' + 'detail' if detail is True else '' - return getRecurringUrlJson( - self._get_endpoint_for('compute', 'compute_legacy') + path, - self._session, - headers=self._requestHeaders(), - key='servers', - params=params, - errMsg='List Vms', - timeout=self._timeout, + ) -> list[typing.Any]: + return list( + self._get_recurring_from_endpoint( + endpoint_types=['compute', 'compute_legacy'], + path='/servers' + ('/detail' if detail is True else ''), + error_message='List Vms', + key='servers', + params=params, + ) ) - @auth_project_required - def listImages(self) -> collections.abc.Iterable[typing.Any]: - return getRecurringUrlJson( - self._get_endpoint_for('image') + '/v2/images?status=active', - self._session, - headers=self._requestHeaders(), - key='images', - errMsg='List Images', - timeout=self._timeout, + @decorators.cached(prefix='imgs', timeout=consts.cache.SHORT_CACHE_TIMEOUT, key_helper=cache_key_helper) + def list_images(self) -> list[typing.Any]: + return list( + self._get_recurring_from_endpoint( + endpoint_types=['image'], + path='/v2/images?status=active', + error_message='List Images', + key='images', + ) ) - @auth_project_required - def listVolumeTypes(self) -> collections.abc.Iterable[typing.Any]: - return getRecurringUrlJson( - self._get_endpoint_for(self._volume) + '/types', - self._session, - headers=self._requestHeaders(), - key='volume_types', - errMsg='List Volume Types', - timeout=self._timeout, + @decorators.cached(prefix='volts', timeout=consts.cache.LONG_CACHE_TIMEOUT, key_helper=cache_key_helper) + def list_volume_types(self) -> list[typing.Any]: + return list( + self._get_recurring_from_endpoint( + endpoint_types=[self._volume], + path='/types', + error_message='List Volume Types', + key='volume_types', + ) ) - @auth_project_required - def listVolumes(self) -> collections.abc.Iterable[typing.Any]: - return getRecurringUrlJson( - self._get_endpoint_for(self._volume) + '/volumes/detail', - self._session, - headers=self._requestHeaders(), - key='volumes', - errMsg='List Volumes', - timeout=self._timeout, + # TODO: Remove this + # return get_recurring_url_json( + # self._get_endpoint_for(self._volume) + '/types', + # self._session, + # headers=self._get_request_headers(), + # key='volume_types', + # error_message='List Volume Types', + # timeout=self._timeout, + # ) + + @decorators.cached(prefix='vols', timeout=consts.cache.SHORT_CACHE_TIMEOUT, key_helper=cache_key_helper) + def list_volumes(self) -> list[typing.Any]: + return list( + self._get_recurring_from_endpoint( + endpoint_types=[self._volume], + path='/volumes/detail', + error_message='List Volumes', + key='volumes', + ) ) - @auth_project_required - def listVolumeSnapshots( - self, volumeId: typing.Optional[dict[str, typing.Any]] = None - ) -> collections.abc.Iterable[typing.Any]: - for s in getRecurringUrlJson( - self._get_endpoint_for(self._volume) + '/snapshots', - self._session, - headers=self._requestHeaders(), - key='snapshots', - errMsg='List snapshots', - timeout=self._timeout, - ): - if volumeId is None or s['volume_id'] == volumeId: - yield s + # TODO: Remove this + # return get_recurring_url_json( + # self._get_endpoint_for(self._volume) + '/volumes/detail', + # self._session, + # headers=self._get_request_headers(), + # key='volumes', + # error_message='List Volumes', + # timeout=self._timeout, + # ) - @auth_project_required - def listAvailabilityZones(self) -> collections.abc.Iterable[typing.Any]: - for az in getRecurringUrlJson( - self._get_endpoint_for('compute', 'compute_legacy') + '/os-availability-zone', - self._session, - headers=self._requestHeaders(), - key='availabilityZoneInfo', - errMsg='List Availability Zones', - timeout=self._timeout, - ): - if az['zoneState']['available'] is True: - yield az['zoneName'] + def list_volume_snapshots( + self, volume_id: typing.Optional[dict[str, typing.Any]] = None + ) -> list[typing.Any]: + return [ + snapshot + for snapshot in self._get_recurring_from_endpoint( + endpoint_types=[self._volume], + path='/snapshots', + error_message='List snapshots', + key='snapshots', + ) + if volume_id is None or snapshot['volume_id'] == volume_id + ] - @auth_project_required - def listFlavors(self) -> collections.abc.Iterable[typing.Any]: - return getRecurringUrlJson( - self._get_endpoint_for('compute', 'compute_legacy') + '/flavors', - self._session, - headers=self._requestHeaders(), - key='flavors', - errMsg='List Flavors', - timeout=self._timeout, + # TODO: Remove this + # for s in get_recurring_url_json( + # self._get_endpoint_for(self._volume) + '/snapshots', + # self._session, + # headers=self._get_request_headers(), + # key='snapshots', + # error_message='List snapshots', + # timeout=self._timeout, + # ): + # if volume_id is None or s['volume_id'] == volume_id: + # yield s + + @decorators.cached(prefix='azs', timeout=consts.cache.LONG_CACHE_TIMEOUT, key_helper=cache_key_helper) + def list_availability_zones(self) -> list[typing.Any]: + return [ + availability_zone['zoneName'] + for availability_zone in self._get_recurring_from_endpoint( + endpoint_types=['compute', 'compute_legacy'], + path='/os-availability-zone', + error_message='List Availability Zones', + key='availabilityZoneInfo', + ) + if availability_zone['zoneState']['available'] is True + ] + + # TODO: Remove this + # for az in get_recurring_url_json( + # self._get_compute_endpoint() + '/os-availability-zone', + # self._session, + # headers=self._get_request_headers(), + # key='availabilityZoneInfo', + # error_message='List Availability Zones', + # timeout=self._timeout, + # ): + # if az['zoneState']['available'] is True: + # yield az['zoneName'] + + @decorators.cached(prefix='flvs', timeout=consts.cache.LONG_CACHE_TIMEOUT, key_helper=cache_key_helper) + def list_flavors(self) -> list[typing.Any]: + return list( + self._get_recurring_from_endpoint( + endpoint_types=['compute', 'compute_legacy'], + path='/flavors', + error_message='List Flavors', + key='flavors', + ) ) - @auth_project_required - def listNetworks(self, name_from_subnets: bool=False) -> collections.abc.Iterable[typing.Any]: - nets = getRecurringUrlJson( - self._get_endpoint_for('network') + '/v2.0/networks', - self._session, - headers=self._requestHeaders(), + # TODO: Remove this + # return get_recurring_url_json( + # self._get_compute_endpoint() + '/flavors', + # self._session, + # headers=self._get_request_headers(), + # key='flavors', + # error_message='List Flavors', + # timeout=self._timeout, + # ) + + @decorators.cached(prefix='nets', timeout=consts.cache.LONG_CACHE_TIMEOUT, key_helper=cache_key_helper) + def list_networks(self, name_from_subnets: bool = False) -> list[typing.Any]: + nets = self._get_recurring_from_endpoint( + endpoint_types=['network'], + path='/v2.0/networks', + error_message='List Networks', key='networks', - errMsg='List Networks', - timeout=self._timeout, ) + + # TODO: Remove this + # nets = get_recurring_url_json( + # self._get_endpoint_for('network') + '/v2.0/networks', + # self._session, + # headers=self._get_request_headers(), + # key='networks', + # error_message='List Networks', + # timeout=self._timeout, + # ) if not name_from_subnets: - yield from nets + return list(nets) else: # Get and cache subnets names subnetNames = {s['id']: s['name'] for s in self.list_subnets()} + res: list[typing.Any] = [] for net in nets: name = ','.join(subnetNames[i] for i in net['subnets'] if i in subnetNames) net['old_name'] = net['name'] if name: net['name'] = name - yield net + res.append(net) + return res - @auth_project_required + @decorators.cached(prefix='subns', timeout=consts.cache.LONG_CACHE_TIMEOUT, key_helper=cache_key_helper) def list_subnets(self) -> collections.abc.Iterable[typing.Any]: - return getRecurringUrlJson( - self._get_endpoint_for('network') + '/v2.0/subnets', - self._session, - headers=self._requestHeaders(), - key='subnets', - errMsg='List Subnets', - timeout=self._timeout, + return list( + self._get_recurring_from_endpoint( + endpoint_types=['network'], + path='/v2.0/subnets', + error_message='List Subnets', + key='subnets', + ) ) - @auth_project_required - def listPorts( + # TODO: Remove this + # return get_recurring_url_json( + # self._get_endpoint_for('network') + '/v2.0/subnets', + # self._session, + # headers=self._get_request_headers(), + # key='subnets', + # error_message='List Subnets', + # timeout=self._timeout, + # ) + + @decorators.cached(prefix='sgps', timeout=consts.cache.LONG_CACHE_TIMEOUT, key_helper=cache_key_helper) + def list_ports( self, networkId: typing.Optional[str] = None, ownerId: typing.Optional[str] = None, - ) -> collections.abc.Iterable[typing.Any]: + ) -> list[typing.Any]: params: dict[str, typing.Any] = {} if networkId is not None: params['network_id'] = networkId if ownerId is not None: params['device_owner'] = ownerId - return getRecurringUrlJson( - self._get_endpoint_for('network') + '/v2.0/ports', - self._session, - headers=self._requestHeaders(), - key='ports', - params=params, - errMsg='List ports', - timeout=self._timeout, + return list( + self._get_recurring_from_endpoint( + endpoint_types=['network'], + path='/v2.0/ports', + error_message='List ports', + key='ports', + params=params, + ) ) - @auth_project_required - def listSecurityGroups(self) -> collections.abc.Iterable[typing.Any]: - return getRecurringUrlJson( - self._get_endpoint_for('compute', 'compute_legacy') + '/os-security-groups', - self._session, - headers=self._requestHeaders(), - key='security_groups', - errMsg='List security groups', - timeout=self._timeout, + # TODO: Remove this + # return get_recurring_url_json( + # self._get_endpoint_for('network') + '/v2.0/ports', + # self._session, + # headers=self._get_request_headers(), + # key='ports', + # params=params, + # error_message='List ports', + # timeout=self._timeout, + # ) + + @decorators.cached(prefix='sgps', timeout=consts.cache.LONG_CACHE_TIMEOUT, key_helper=cache_key_helper) + def list_security_groups(self) -> list[typing.Any]: + return list( + self._get_recurring_from_endpoint( + endpoint_types=['network'], + path='/v2.0/security-groups', + error_message='List security groups', + key='security_groups', + ) ) - @auth_project_required - def getServer(self, serverId: str) -> dict[str, typing.Any]: - r = self._session.get( - self._get_endpoint_for('compute', 'compute_legacy') - + '/servers/{server_id}'.format(server_id=serverId), - headers=self._requestHeaders(), - timeout=self._timeout, + # TODO: Remove this + # return get_recurring_url_json( + # self._get_compute_endpoint() + '/os-security-groups', + # self._session, + # headers=self._get_request_headers(), + # key='security_groups', + # error_message='List security groups', + # timeout=self._timeout, + # ) + + def get_server(self, server_id: str) -> dict[str, typing.Any]: + r = self._request_from_endpoint( + 'get', + endpoints_types=['compute', 'compute_legacy'], + path=f'/servers/{server_id}', + error_message='Get Server information', ) - ensureResponseIsValid(r, 'Get Server information') return r.json()['server'] - @auth_project_required - def getVolume(self, volumeId: str) -> dict[str, typing.Any]: - r = self._session.get( - self._get_endpoint_for(self._volume) + '/volumes/{volume_id}'.format(volume_id=volumeId), - headers=self._requestHeaders(), - timeout=self._timeout, + def get_volume(self, volumeId: str) -> dict[str, typing.Any]: + r = self._request_from_endpoint( + 'get', + endpoints_types=[self._volume], + path=f'/volumes/{volumeId}', + error_message='Get Volume information', ) - ensureResponseIsValid(r, 'Get Volume information') + # TODO: Remove this + # r = self._session.get( + # self._get_endpoint_for(self._volume) + '/volumes/{volume_id}'.format(volume_id=volumeId), + # headers=self._get_request_headers(), + # timeout=self._timeout, + # ) + + # ensure_valid_response(r, 'Get Volume information') return r.json()['volume'] - @auth_project_required - def get_snapshot(self, snapshotId: str) -> dict[str, typing.Any]: + def get_snapshot(self, snapshot_id: str) -> dict[str, typing.Any]: """ States are: creating, available, deleting, error, error_deleting """ - r = self._session.get( - self._get_endpoint_for(self._volume) + '/snapshots/{snapshot_id}'.format(snapshot_id=snapshotId), - headers=self._requestHeaders(), - timeout=self._timeout, + r = self._request_from_endpoint( + 'get', + endpoints_types=[self._volume], + path=f'/snapshots/{snapshot_id}', + error_message='Get Snaphost information', ) - ensureResponseIsValid(r, 'Get Snaphost information') + # TODO: Remove this + # r = self._session.get( + # self._get_endpoint_for(self._volume) + '/snapshots/{snapshot_id}'.format(snapshot_id=snapshotId), + # headers=self._get_request_headers(), + # timeout=self._timeout, + # ) + + # ensure_valid_response(r, 'Get Snaphost information') return r.json()['snapshot'] - @auth_project_required - def updateSnapshot( + def update_snapshot( self, - snapshotId: str, + snapshot_id: str, name: typing.Optional[str] = None, description: typing.Optional[str] = None, ) -> dict[str, typing.Any]: @@ -525,46 +742,62 @@ class Client: # pylint: disable=too-many-public-methods if description: data['snapshot']['description'] = description - r = self._session.put( - self._get_endpoint_for(self._volume) + '/snapshots/{snapshot_id}'.format(snapshot_id=snapshotId), + r = self._request_from_endpoint( + 'put', + endpoints_types=[self._volume], + path=f'/snapshots/{snapshot_id}', data=json.dumps(data), - headers=self._requestHeaders(), - timeout=self._timeout, + error_message='Update Snaphost information', ) - ensureResponseIsValid(r, 'Update Snaphost information') + # TODO: Remove this + # r = self._session.put( + # self._get_endpoint_for(self._volume) + '/snapshots/{snapshot_id}'.format(snapshot_id=snapshot_id), + # data=json.dumps(data), + # headers=self._get_request_headers(), + # timeout=self._timeout, + # ) + + # ensure_valid_response(r, 'Update Snaphost information') return r.json()['snapshot'] - @auth_project_required def create_volume_snapshot( - self, volumeId: str, name: str, description: typing.Optional[str] = None + self, volume_id: str, name: str, description: typing.Optional[str] = None ) -> dict[str, typing.Any]: description = description or 'UDS Snapshot' data = { 'snapshot': { 'name': name, 'description': description, - 'volume_id': volumeId, + 'volume_id': volume_id, 'force': True, } } # First, ensure volume is in state "available" - r = self._session.post( - self._get_endpoint_for(self._volume) + '/snapshots', + r = self._request_from_endpoint( + 'post', + endpoints_types=[self._volume], + path=f'/volumes/{volume_id}', data=json.dumps(data), - headers=self._requestHeaders(), - timeout=self._timeout, + error_message='Get Volume information', ) - ensureResponseIsValid(r, 'Cannot create snapshot. Ensure volume is in state "available"') + # TODO: Remove this + # r = self._session.post( + # self._get_endpoint_for(self._volume) + '/snapshots', + # data=json.dumps(data), + # headers=self._get_request_headers(), + # timeout=self._timeout, + # ) + + # ensure_valid_response(r, 'Cannot create snapshot. Ensure volume is in state "available"') return r.json()['snapshot'] - @auth_project_required - def createVolumeFromSnapshot( + def create_volume_from_snapshot( self, snapshotId: str, name: str, description: typing.Optional[str] = None ) -> dict[str, typing.Any]: description = description or 'UDS Volume' @@ -577,18 +810,26 @@ class Client: # pylint: disable=too-many-public-methods } } - r = self._session.post( - self._get_endpoint_for(self._volume) + '/volumes', + r = self._request_from_endpoint( + 'post', + endpoints_types=[self._volume], + path='/volumes', data=json.dumps(data), - headers=self._requestHeaders(), - timeout=self._timeout, + error_message='Create Volume from Snapshot', ) - ensureResponseIsValid(r, 'Cannot create volume from snapshot.') + # TODO: Remove this + # r = self._session.post( + # self._get_endpoint_for(self._volume) + '/volumes', + # data=json.dumps(data), + # headers=self._get_request_headers(), + # timeout=self._timeout, + # ) + + # ensure_valid_response(r, 'Cannot create volume from snapshot.') return r.json()['volume'] - @auth_project_required def create_server_from_snapshot( self, snapshotId: str, @@ -626,106 +867,158 @@ class Client: # pylint: disable=too-many-public-methods } } - r = self._session.post( - self._get_endpoint_for('compute', 'compute_legacy') + '/servers', + r = self._request_from_endpoint( + 'post', + endpoints_types=['compute', 'compute_legacy'], + path='/servers', data=json.dumps(data), - headers=self._requestHeaders(), - timeout=self._timeout, + error_message='Create instance from snapshot', ) - ensureResponseIsValid(r, 'Cannot create instance from snapshot.') + # TODO: Remove this + # r = self._session.post( + # self._get_compute_endpoint() + '/servers', + # data=json.dumps(data), + # headers=self._get_request_headers(), + # timeout=self._timeout, + # ) + + # ensure_valid_response(r, 'Cannot create instance from snapshot.') return r.json()['server'] - @auth_project_required - def delete_server(self, serverId: str) -> None: + @auth_required(for_project=True) + def delete_server(self, server_id: str) -> None: + # This does not returns anything + self._request_from_endpoint( + 'delete', + endpoints_types=['compute', 'compute_legacy'], + path=f'/servers/{server_id}', + error_message='Cannot delete server (probably server does not exists).', + ) + + # TODO: Remove this # r = self._session.post( # self._getEndpointFor('compute', , 'compute_legacy') + '/servers/{server_id}/action'.format(server_id=serverId), # data='{"forceDelete": null}', # headers=self._requestHeaders(), # timeout=self._timeout # ) - r = self._session.delete( - self._get_endpoint_for('compute', 'compute_legacy') - + '/servers/{server_id}'.format(server_id=serverId), - headers=self._requestHeaders(), - timeout=self._timeout, - ) - ensureResponseIsValid(r, 'Cannot delete server (probably server does not exists).') + # r = self._session.delete( + # self._get_compute_endpoint() + f'/servers/{server_id}', + # headers=self._get_request_headers(), + # timeout=self._timeout, + # ) + # ensure_valid_response(r, 'Cannot delete server (probably server does not exists).') + + def delete_snapshot(self, snapshot_id: str) -> None: # This does not returns anything - - @auth_project_required - def delete_snapshot(self, snapshotId: str) -> None: - r = self._session.delete( - self._get_endpoint_for(self._volume) + '/snapshots/{snapshot_id}'.format(snapshot_id=snapshotId), - headers=self._requestHeaders(), - timeout=self._timeout, + self._request_from_endpoint( + 'delete', + endpoints_types=[self._volume], + path=f'/snapshots/{snapshot_id}', + error_message='Cannot remove snapshot.', ) - ensureResponseIsValid(r, 'Cannot remove snapshot.') + # TODO: Remove this + # r = self._session.delete( + # self._get_endpoint_for(self._volume) + '/snapshots/{snapshot_id}'.format(snapshot_id=snapshotId), + # headers=self._get_request_headers(), + # timeout=self._timeout, + # ) + + # ensure_valid_response(r, 'Cannot remove snapshot.') # Does not returns a message body - @auth_project_required - def start_server(self, serverId: str) -> None: - r = self._session.post( - self._get_endpoint_for('compute', 'compute_legacy') - + '/servers/{server_id}/action'.format(server_id=serverId), + def start_server(self, server_id: str) -> None: + # this does not returns anything + self._request_from_endpoint( + 'post', + endpoints_types=['compute', 'compute_legacy'], + path=f'/servers/{server_id}/action', data='{"os-start": null}', - headers=self._requestHeaders(), - timeout=self._timeout, + error_message='Starting server', ) - ensureResponseIsValid(r, 'Starting server') + # TODO: Remove this + # r = self._session.post( + # self._get_compute_endpoint() + f'/servers/{server_id}/action', + # data='{"os-start": null}', + # headers=self._get_request_headers(), + # timeout=self._timeout, + # ) - # This does not returns anything + # ensure_valid_response(r, 'Starting server') - @auth_project_required - def stop_server(self, serverId: str) -> None: - r = self._session.post( - self._get_endpoint_for('compute', 'compute_legacy') - + '/servers/{server_id}/action'.format(server_id=serverId), + def stop_server(self, server_id: str) -> None: + # this does not returns anything + self._request_from_endpoint( + 'post', + endpoints_types=['compute', 'compute_legacy'], + path=f'/servers/{server_id}/action', data='{"os-stop": null}', - headers=self._requestHeaders(), - timeout=self._timeout, + error_message='Stoping server', ) - ensureResponseIsValid(r, 'Stoping server') + # TODO: Remove this + # r = self._session.post( + # self._get_compute_endpoint() + f'/servers/{server_id}/action', + # data='{"os-stop": null}', + # headers=self._get_request_headers(), + # timeout=self._timeout, + # ) - @auth_project_required - def suspend_server(self, serverId: str) -> None: - r = self._session.post( - self._get_endpoint_for('compute', 'compute_legacy') - + '/servers/{server_id}/action'.format(server_id=serverId), + # ensure_valid_response(r, 'Stoping server') + + @auth_required(for_project=True) + def suspend_server(self, server_id: str) -> None: + # this does not returns anything + self._request_from_endpoint( + 'post', + endpoints_types=['compute', 'compute_legacy'], + path=f'/servers/{server_id}/action', data='{"suspend": null}', - headers=self._requestHeaders(), - timeout=self._timeout, + error_message='Suspending server', ) - ensureResponseIsValid(r, 'Suspending server') + # TODO: Remove this + # r = self._session.post( + # self._get_compute_endpoint() + f'/servers/{server_id}/action', + # data='{"suspend": null}', + # headers=self._get_request_headers(), + # timeout=self._timeout, + # ) - @auth_project_required - def resume_server(self, serverId: str) -> None: - r = self._session.post( - self._get_endpoint_for('compute', 'compute_legacy') - + '/servers/{server_id}/action'.format(server_id=serverId), + # ensure_valid_response(r, 'Suspending server') + + def resume_server(self, server_id: str) -> None: + # This does not returns anything + self._request_from_endpoint( + 'post', + endpoints_types=['compute', 'compute_legacy'], + path=f'/servers/{server_id}/action', data='{"resume": null}', - headers=self._requestHeaders(), - timeout=self._timeout, + error_message='Resuming server', ) - ensureResponseIsValid(r, 'Resuming server') + # r = self._session.post( + # self._get_compute_endpoint() + f'/servers/{server_id}/action', + # data='{"resume": null}', + # headers=self._get_request_headers(), + # timeout=self._timeout, + # ) + + # ensure_valid_response(r, 'Resuming server') - @auth_project_required def reset_server(self, server_id: str) -> None: # Does not need return value self._session.post( - self._get_endpoint_for('compute', 'compute_legacy') - + '/servers/{server_id}/action'.format(server_id=server_id), + self._get_compute_endpoint() + f'/servers/{server_id}/action', data='{"reboot":{"type":"HARD"}}', - headers=self._requestHeaders(), + headers=self._get_request_headers(), timeout=self._timeout, ) @@ -736,7 +1029,7 @@ class Client: # pylint: disable=too-many-public-methods # First, ensure requested api is supported # We need api version 3.2 or greater try: - r = self._session.get(self._authurl, headers=self._requestHeaders()) + r = self._session.get(self._authurl, headers=self._get_request_headers()) except Exception: logger.exception('Testing') raise Exception('Connection error') @@ -746,7 +1039,7 @@ class Client: # pylint: disable=too-many-public-methods if v['id'] >= 'v3.1': # Tries to authenticate try: - self.authPassword() + self.authenticate_with_password() # Log some useful information logger.info('Openstack version: %s', v['id']) logger.info('Endpoints: %s', json.dumps(self._catalog, indent=4)) @@ -767,7 +1060,7 @@ class Client: # pylint: disable=too-many-public-methods def is_available(self) -> bool: try: # If we can connect, it is available - self._session.get(self._authurl, verify=VERIFY_SSL, headers=self._requestHeaders()) + self._session.get(self._authurl, headers=self._get_request_headers()) return True except Exception: return False diff --git a/server/src/uds/services/OpenStack/provider.py b/server/src/uds/services/OpenStack/provider.py index e4f196f89..4e8082627 100644 --- a/server/src/uds/services/OpenStack/provider.py +++ b/server/src/uds/services/OpenStack/provider.py @@ -50,7 +50,7 @@ if typing.TYPE_CHECKING: logger = logging.getLogger(__name__) -INTERFACE_VALUES: list[types.ui.ChoiceItem] = [ +INTERFACE_VALUES: typing.Final[list[types.ui.ChoiceItem]] = [ gui.choice_item('public', 'public'), gui.choice_item('private', 'private'), gui.choice_item('admin', 'admin'), diff --git a/server/src/uds/services/OpenStack/service.py b/server/src/uds/services/OpenStack/service.py index 8595dc079..51e86927d 100644 --- a/server/src/uds/services/OpenStack/service.py +++ b/server/src/uds/services/OpenStack/service.py @@ -236,7 +236,7 @@ class OpenStackLiveService(services.Service): gui.choice_item(parentCurrent.region.value, parentCurrent.region.value) ] else: - regions = [gui.choice_item(r['id'], r['id']) for r in api.listRegions()] + regions = [gui.choice_item(r['id'], r['id']) for r in api.list_regions()] self.region.set_choices(regions) @@ -245,7 +245,7 @@ class OpenStackLiveService(services.Service): gui.choice_item(parentCurrent.tenant.value, parentCurrent.tenant.value) ] else: - tenants = [gui.choice_item(t['id'], t['name']) for t in api.listProjects()] + tenants = [gui.choice_item(t['id'], t['name']) for t in api.list_projects()] self.project.set_choices(tenants) # So we can instantiate parent to get API @@ -340,7 +340,7 @@ class OpenStackLiveService(services.Service): VERIFY_RESIZE. System is awaiting confirmation that the server is operational after a move or resize. SHUTOFF. The server was powered down by the user, either through the OpenStack Compute API or from within the server. For example, the user issued a shutdown -h command from within the server. If the OpenStack Compute manager detects that the VM was powered down, it transitions the server to the SHUTOFF status. """ - server = self.api.getServer(machineId) + server = self.api.get_server(machineId) if server['status'] in ('ERROR', 'DELETED'): logger.warning( 'Got server status %s for %s: %s', @@ -422,7 +422,7 @@ class OpenStackLiveService(services.Service): """ Gets the mac address of first nic of the machine """ - net = self.api.getServer(machineid)['addresses'] + net = self.api.get_server(machineid)['addresses'] vals = next(iter(net.values()))[ 0 ] # Returns "any" mac address of any interface. We just need only one interface info