1
0
mirror of https://github.com/dkmstr/openuds.git synced 2025-01-03 01:17:56 +03:00

chore: Update function names and parameters for OpenNebula storage and template enumeration

This commit is contained in:
Adolfo Gómez García 2024-08-29 03:59:10 +02:00
parent bb9073891c
commit 6d6b8e07d9
No known key found for this signature in database
GPG Key ID: DD1ABF20724CDA23
10 changed files with 142 additions and 135 deletions

View File

@ -142,17 +142,17 @@ class MetaPool(UUIDModel, TaggingMixin):
maintenance += 1
return total == maintenance
def is_access_allowed(self, chkDateTime: typing.Optional['datetime.datetime'] = None) -> bool:
def is_access_allowed(self, check_datetime: typing.Optional['datetime.datetime'] = None) -> bool:
"""
Checks if the access for a service pool is allowed or not (based esclusively on associated calendars)
"""
if chkDateTime is None:
chkDateTime = sql_now()
if check_datetime is None:
check_datetime = sql_now()
access = self.fallbackAccess
# Let's see if we can access by current datetime
for ac in sorted(self.calendarAccess.all(), key=operator.attrgetter('priority')):
if CalendarChecker(ac.calendar).check(chkDateTime):
if CalendarChecker(ac.calendar).check(check_datetime):
access = ac.access
break # Stops on first rule match found
@ -164,7 +164,7 @@ class MetaPool(UUIDModel, TaggingMixin):
If no "maximum" number of services, will return 0% ofc
cachedValue is used to optimize (if known the number of assigned services, we can avoid to query the db)
Note:
No metapoools, cachedValue is ignored, but keep for consistency with servicePool
No metapoools, cachedValue is ignored, but keep for consistency with servicePool usage signature
"""
# If no pools, return 0%
if self.members.count() == 0:
@ -194,13 +194,13 @@ class MetaPool(UUIDModel, TaggingMixin):
usage_count = 0
max_count = 0
for pool in query:
poolInfo = pool.usage(typing.cast(typing.Any, pool).usage_count) # usage_count is anottated value, integer
usage_count += poolInfo.used
pool_info = pool.usage(typing.cast(typing.Any, pool).usage_count) # usage_count is anottated value, integer
usage_count += pool_info.used
# If any of the pools has no max, then max is -1
if max_count == consts.UNLIMITED or poolInfo.total == consts.UNLIMITED:
if max_count == consts.UNLIMITED or pool_info.total == consts.UNLIMITED:
max_count = consts.UNLIMITED
else:
max_count += poolInfo.total
max_count += pool_info.total
if max_count == 0 or max_count == consts.UNLIMITED:
return types.pools.UsageInfo(usage_count, consts.UNLIMITED)

View File

@ -113,7 +113,7 @@ class Network(UUIDModel, TaggingMixin):
)
@staticmethod
def create(name: str, netRange: str) -> 'Network':
def create(name: str, net_range: str) -> 'Network':
"""
Creates an network record, with the specified network range. Supports IPv4 and IPv6
IPV4 has a versatile format, that can be:
@ -128,12 +128,12 @@ class Network(UUIDModel, TaggingMixin):
netRange: Network range in any supported format
"""
nr = net.network_from_str(netRange)
nr = net.network_from_str(net_range)
return Network.objects.create(
name=name,
start=Network.hexlify(nr.start),
end=Network.hexlify(nr.end),
net_string=netRange,
net_string=net_range,
version=nr.version,
)
@ -192,8 +192,8 @@ class Network(UUIDModel, TaggingMixin):
# if net_string is '*', then we are in all networks, return true
if self.net_string == '*':
return True
ipInt, version = net.ip_to_long(ip)
return self.net_start <= ipInt <= self.net_end and self.version == version
ip_int, version = net.ip_to_long(ip)
return self.version == version and self.net_start <= ip_int <= self.net_end
# utility method to allow "in" operator
__contains__ = contains

View File

@ -128,10 +128,10 @@ class Service(ManagedObjectModel, TaggingMixin):
return typing.cast('services.Service', self._cached_instance)
prov: 'services.ServiceProvider' = self.provider.get_instance()
sType = prov.get_service_by_type(self.data_type)
service_type = prov.get_service_by_type(self.data_type)
if sType:
obj = sType(self.get_environment(), prov, values, uuid=self.uuid)
if service_type:
obj = service_type(self.get_environment(), prov, values, uuid=self.uuid)
self.deserialize(obj, values)
else:
raise Exception(f'Service type of {self.data_type} is not recognized by provider {prov.mod_name}')

View File

@ -54,17 +54,24 @@ class StatsCountersAccum(models.Model):
def seconds(self) -> int:
"""Returns the number of seconds for this interval type"""
if self == self.HOUR:
match self:
case self.HOUR:
return 3600
if self == self.DAY:
case self.DAY:
return 86400
raise ValueError('Invalid interval type')
def prev(self) -> 'StatsCountersAccum.IntervalType':
"""Returns the previous interval type"""
if self == self.DAY:
match self:
case self.HOUR:
raise ValueError('No previous interval for HOUR')
case self.DAY:
return StatsCountersAccum.IntervalType.HOUR
raise ValueError('Invalid interval type')
def is_base_interval(self) -> bool:
"""Returns if this is the base interval"""
return self == StatsCountersAccum.IntervalType.HOUR
owner_type = models.SmallIntegerField(default=0)
owner_id = models.IntegerField(default=0)
@ -134,7 +141,10 @@ class StatsCountersAccum(models.Model):
type['StatsCountersAccum'],
type['StatsCounters'],
]
if interval_type == StatsCountersAccum.IntervalType.HOUR:
# If base interval, we will use StatsCounters to create the accum
# Else, we will use StatsCountersAccum to create the accum from previous interval
# (for example, to create daily accum from hourly data)
if interval_type.is_base_interval():
model = StatsCounters
else:
model = StatsCountersAccum

View File

@ -288,10 +288,7 @@ class OVirtLinkedService(services.Service): # pylint: disable=too-many-public-m
Args:
name: Name (sanitized) of the machine
comments: Comments for machine
templateId: Id of the template to deploy from
displayType: 'vnc' or 'spice'. Display to use ad oVirt admin interface
memoryMB: Memory requested for machine, in MB
guaranteedMB: Minimum memory guaranteed for this machine
template_id: Id of the template to deploy from
Returns:
Info of the deployed machine

View File

@ -56,7 +56,7 @@ def ensure_connected(fnc: collections.abc.Callable[..., RT]) -> collections.abc.
# Result checker
def checkResultRaw(lst: typing.Any) -> str:
def check_result_raw(lst: typing.Any) -> str:
# Openebula response is always this way:
# [Boolean, String, ErrorCode]
# First is True if ok, False if not
@ -68,8 +68,8 @@ def checkResultRaw(lst: typing.Any) -> str:
return str(lst[1])
def checkResult(lst: typing.Any) -> tuple[collections.abc.Mapping[str, typing.Any], str]:
return xml2dict.parse(checkResultRaw(lst)), lst[1]
def check_result(lst: typing.Any) -> tuple[collections.abc.Mapping[str, typing.Any], str]:
return xml2dict.parse(check_result_raw(lst)), lst[1]
as_iterable = ensure.as_iterable
@ -80,7 +80,7 @@ class OpenNebulaClient: # pylint: disable=too-many-public-methods
password: str
endpoint: str
connection: xmlrpc.client.ServerProxy
cachedVersion: typing.Optional[list[str]]
cached_version: typing.Optional[list[str]]
def __init__(self, username: str, password: str, endpoint: str) -> None:
self.username = username
@ -88,7 +88,7 @@ class OpenNebulaClient: # pylint: disable=too-many-public-methods
self.endpoint = endpoint
# Connection "None" will be treated on ensureConnected, ignore its assignement here
self.connection = None # type: ignore
self.cachedVersion = None
self.cached_version = None
@property
def session_string(self) -> str:
@ -97,11 +97,11 @@ class OpenNebulaClient: # pylint: disable=too-many-public-methods
@property
@ensure_connected
def version(self) -> list[str]:
if self.cachedVersion is None:
if self.cached_version is None:
# Retrieve Version & keep it
result = self.connection.one.system.version(self.session_string)
self.cachedVersion = checkResultRaw(result).split('.')
return self.cachedVersion
self.cached_version = check_result_raw(result).split('.')
return self.cached_version
def connect(self) -> None:
if not self.connection:
@ -110,16 +110,16 @@ class OpenNebulaClient: # pylint: disable=too-many-public-methods
self.connection = xmlrpc.client.ServerProxy(self.endpoint)
@ensure_connected
def enumStorage(self, storageType: int = 0) -> collections.abc.Iterable[types.StorageType]:
def enum_storage(self, storageType: int = 0) -> collections.abc.Iterable[types.StorageType]:
sstorageType = str(storageType) # Ensure it is an string
# Invoke datastore pools info, no parameters except connection string
result, _ = checkResult(self.connection.one.datastorepool.info(self.session_string))
result, _ = check_result(self.connection.one.datastorepool.info(self.session_string))
for ds in as_iterable(result['DATASTORE_POOL']['DATASTORE']):
if ds['TYPE'] == sstorageType:
yield types.StorageType(ds['ID'], ds['NAME'], int(ds['TOTAL_MB']), int(ds['FREE_MB']), None)
@ensure_connected
def enumTemplates(self) -> collections.abc.Iterable[types.TemplateType]:
def enum_templates(self) -> collections.abc.Iterable[types.TemplateType]:
"""
Invoke templates pools info, with this parameters:
1.- Session string
@ -127,7 +127,7 @@ class OpenNebulaClient: # pylint: disable=too-many-public-methods
3.- When the next parameter is >= -1 this is the Range start ID. Can be -1. For smaller values this is the offset used for pagination.
4.- For values >= -1 this is the Range end ID. Can be -1 to get until the last ID. For values < -1 this is the page size used for pagination.
"""
result, _ = checkResult(self.connection.one.templatepool.info(self.session_string, -1, -1, -1))
result, _ = check_result(self.connection.one.templatepool.info(self.session_string, -1, -1, -1))
for ds in as_iterable(result['VMTEMPLATE_POOL']['VMTEMPLATE']):
try:
yield types.TemplateType(ds['ID'], ds['NAME'], int(ds['TEMPLATE']['MEMORY']), None)
@ -135,7 +135,7 @@ class OpenNebulaClient: # pylint: disable=too-many-public-methods
pass
@ensure_connected
def enumImages(self) -> collections.abc.Iterable[types.ImageType]:
def enum_images(self) -> collections.abc.Iterable[types.ImageType]:
"""
Invoke images pools info, with this parameters:
1.- Session string
@ -143,7 +143,7 @@ class OpenNebulaClient: # pylint: disable=too-many-public-methods
3.- When the next parameter is >= -1 this is the Range start ID. Can be -1. For smaller values this is the offset used for pagination.
4.- For values >= -1 this is the Range end ID. Can be -1 to get until the last ID. For values < -1 this is the page size used for pagination.
"""
result, _ = checkResult(self.connection.one.imagepool.info(self.session_string, -1, -1, -1))
result, _ = check_result(self.connection.one.imagepool.info(self.session_string, -1, -1, -1))
for ds in as_iterable(result['IMAGE_POOL']['IMAGE']):
yield types.ImageType(
ds['ID'],
@ -156,14 +156,14 @@ class OpenNebulaClient: # pylint: disable=too-many-public-methods
)
@ensure_connected
def templateInfo(self, templateId: str, extraInfo: bool = False) -> types.TemplateType:
def template_info(self, templateId: str, extraInfo: bool = False) -> types.TemplateType:
"""
Returns a list
first element is a dictionary (built from XML)
second is original XML
"""
result = self.connection.one.template.info(self.session_string, int(templateId), extraInfo)
ds, xml = checkResult(result)
ds, xml = check_result(result)
return types.TemplateType(
ds['VMTEMPLATE']['ID'],
ds['VMTEMPLATE']['NAME'],
@ -172,7 +172,7 @@ class OpenNebulaClient: # pylint: disable=too-many-public-methods
)
@ensure_connected
def instantiateTemplate(
def instantiate_template(
self,
templateId: str,
vmName: str,
@ -204,10 +204,10 @@ class OpenNebulaClient: # pylint: disable=too-many-public-methods
privatePersistent,
)
return checkResultRaw(result)
return check_result_raw(result)
@ensure_connected
def updateTemplate(self, templateId: str, templateData: str, updateType: int = 0) -> str:
def update_template(self, templateId: str, templateData: str, updateType: int = 0) -> str:
"""
Updates the template with the templateXml
1.- Session string
@ -218,10 +218,10 @@ class OpenNebulaClient: # pylint: disable=too-many-public-methods
result = self.connection.one.template.update(
self.session_string, int(templateId), templateData, int(updateType)
)
return checkResultRaw(result)
return check_result_raw(result)
@ensure_connected
def cloneTemplate(self, templateId: str, name: str) -> str:
def clone_template(self, templateId: str, name: str) -> str:
"""
Clones the template
"""
@ -232,39 +232,39 @@ class OpenNebulaClient: # pylint: disable=too-many-public-methods
self.session_string, int(templateId), name, False
) # This works as previous version clone
return checkResultRaw(result)
return check_result_raw(result)
@ensure_connected
def deleteTemplate(self, templateId: str) -> str:
def delete_template(self, templateId: str) -> str:
"""
Deletes the template (not images)
"""
result = self.connection.one.template.delete(self.session_string, int(templateId))
return checkResultRaw(result)
return check_result_raw(result)
@ensure_connected
def cloneImage(self, srcId: str, name: str, datastoreId: typing.Union[str, int] = -1) -> str:
def clone_image(self, srcId: str, name: str, datastoreId: typing.Union[str, int] = -1) -> str:
"""
Clones the image.
"""
result = self.connection.one.image.clone(self.session_string, int(srcId), name, int(datastoreId))
return checkResultRaw(result)
return check_result_raw(result)
@ensure_connected
def makePersistentImage(self, imageId: str, persistent: bool = False) -> str:
def make_persistent_image(self, imageId: str, persistent: bool = False) -> str:
"""
Clones the image.
"""
result = self.connection.one.image.persistent(self.session_string, int(imageId), persistent)
return checkResultRaw(result)
return check_result_raw(result)
@ensure_connected
def deleteImage(self, imageId: str) -> str:
def delete_image(self, imageId: str) -> str:
"""
Deletes an image
"""
result = self.connection.one.image.delete(self.session_string, int(imageId))
return checkResultRaw(result)
return check_result_raw(result)
@ensure_connected
def image_info(self, imginfo: str) -> types.ImageType:
@ -273,7 +273,7 @@ class OpenNebulaClient: # pylint: disable=too-many-public-methods
first element is a dictionary (built from XML)
second is original XML
"""
result, xml = checkResult(self.connection.one.image.info(self.session_string, int(imginfo)))
result, xml = check_result(self.connection.one.image.info(self.session_string, int(imginfo)))
ds = result['IMAGE']
return types.ImageType(
ds['ID'],
@ -295,7 +295,7 @@ class OpenNebulaClient: # pylint: disable=too-many-public-methods
4.- For values >= -1 this is the Range end ID. Can be -1 to get until the last ID. For values < -1 this is the page size used for pagination.
5.- VM state to filter by. (-2 = any state including DONE, -1 = any state EXCEPT DONE)
"""
result, _ = checkResult(self.connection.one.vmpool.info(self.session_string, -1, -1, -1, -1))
result, _ = check_result(self.connection.one.vmpool.info(self.session_string, -1, -1, -1, -1))
if result['VM_POOL']:
for ds in as_iterable(result['VM_POOL'].get('VM', [])):
yield types.VirtualMachineType(
@ -307,13 +307,13 @@ class OpenNebulaClient: # pylint: disable=too-many-public-methods
)
@ensure_connected
def VMInfo(self, vmId: str) -> types.VirtualMachineType:
def vm_info(self, vmId: str) -> types.VirtualMachineType:
"""
Returns a list
first element is a dictionary (built from XML)
second is original XML
"""
result, xml = checkResult(self.connection.one.vm.info(self.session_string, int(vmId)))
result, xml = check_result(self.connection.one.vm.info(self.session_string, int(vmId)))
ds = result['VM']
return types.VirtualMachineType(
ds['ID'],
@ -339,7 +339,7 @@ class OpenNebulaClient: # pylint: disable=too-many-public-methods
"""
Returns the VM State
"""
return self.VMInfo(vmId).state
return self.vm_info(vmId).state
@ensure_connected
def get_machine_substate(self, vmId: str) -> int:
@ -347,7 +347,7 @@ class OpenNebulaClient: # pylint: disable=too-many-public-methods
Returns the VM State
"""
result = self.connection.one.vm.info(self.session_string, int(vmId))
r, _ = checkResult(result)
r, _ = check_result(result)
try:
if int(r['VM']['STATE']) == types.VmState.ACTIVE.value:
return int(r['VM']['LCM_STATE'])
@ -360,4 +360,4 @@ class OpenNebulaClient: # pylint: disable=too-many-public-methods
@ensure_connected
def set_machine_state(self, vmId: str, action: str) -> str:
result = self.connection.one.vm.action(self.session_string, action, int(vmId))
return checkResultRaw(result)
return check_result_raw(result)

View File

@ -47,4 +47,4 @@ def enumerateDatastores(
"""
0 seems to be images datastore
"""
yield from api.enumStorage(datastoreType)
yield from api.enum_storage(datastoreType)

View File

@ -47,16 +47,16 @@ if typing.TYPE_CHECKING:
logger = logging.getLogger(__name__)
def getTemplates(
def enumerate_templates(
api: 'client.OpenNebulaClient', force: bool = False
) -> collections.abc.Iterable[types.TemplateType]:
for t in api.enumTemplates():
for t in api.enum_templates():
if t.name[:4] != 'UDSP':
yield t
def create(
api: 'client.OpenNebulaClient', fromTemplateId: str, name: str, toDataStore: str
api: 'client.OpenNebulaClient', from_template_id: str, name: str, dst_dataestor: str
) -> str:
"""
Publish the machine (makes a template from it so we can create COWs) and returns the template id of
@ -72,75 +72,75 @@ def create(
Note:
Maybe we need to also clone the hard disk?
"""
templateId = None
template_id = None
try:
# First, we clone the themplate itself
# templateId = api.call('template.clone', int(fromTemplateId), name)
templateId = api.cloneTemplate(fromTemplateId, name)
template_id = api.clone_template(from_template_id, name)
# Now copy cloned images if possible
imgs = {i.name: i.id for i in api.enumImages()}
imgs = {i.name: i.id for i in api.enum_images()}
info = api.templateInfo(templateId).xml
info = api.template_info(template_id).xml
template: typing.Any = minidom.parseString(info).getElementsByTagName('TEMPLATE')[0] # pyright: ignore
logger.debug('XML: %s', template.toxml()) # pyright: ignore
for counter, dsk in enumerate(template.getElementsByTagName('DISK')): # pyright: ignore
imgIds = dsk.getElementsByTagName('IMAGE_ID')
if not imgIds:
fromId = False
images_ids = dsk.getElementsByTagName('IMAGE_ID')
if not images_ids:
from_id = False
try:
node = dsk.getElementsByTagName('IMAGE')[0].childNodes[0]
except IndexError:
continue # Skip this unknown node
imgName = node.data
image_name = node.data
# Locate
try:
imgId = imgs[imgName.strip()]
image_id = imgs[image_name.strip()]
except KeyError:
raise Exception(
'Image "{}" could not be found!. Check the opennebula template'.format(
imgName.strip()
image_name.strip()
)
)
else:
fromId = True
node = imgIds[0].childNodes[0]
imgId = node.data
from_id = True
node = images_ids[0].childNodes[0]
image_id = node.data
logger.debug('Found %s for cloning', imgId)
logger.debug('Found %s for cloning', image_id)
# if api.imageInfo(imgId)[0]['IMAGE']['STATE'] != '1':
# raise Exception('The base machines images are not in READY state')
# Now clone the image
imgName = sanitized_name(name + ' DSK ' + str(counter))
newId = api.cloneImage(
imgId, imgName, toDataStore
image_name = sanitized_name(name + ' DSK ' + str(counter))
new_id = api.clone_image(
image_id, image_name, dst_dataestor
) # api.call('image.clone', int(imgId), imgName, int(toDataStore))
# Now Store id/name
if fromId is True:
node.data = str(newId)
if from_id is True:
node.data = str(new_id)
else:
node.data = imgName
node.data = image_name
# Now update the clone
# api.call('template.update', templateId, template.toxml())
api.updateTemplate(templateId, template.toxml())
api.update_template(template_id, template.toxml())
return templateId
return template_id
except Exception as e:
logger.exception('Creating template on OpenNebula')
try:
api.deleteTemplate(
templateId
api.delete_template(
template_id
) # Try to remove created template in case of fail
except Exception:
pass
raise e
def remove(api: 'client.OpenNebulaClient', templateId: str) -> None:
def remove(api: 'client.OpenNebulaClient', template_id: str) -> None:
"""
Removes a template from ovirt server
@ -150,37 +150,37 @@ def remove(api: 'client.OpenNebulaClient', templateId: str) -> None:
# First, remove Images (wont be possible if there is any images already in use, but will try)
# Now copy cloned images if possible
try:
imgs = {i.name: i.id for i in api.enumImages()}
imgs = {i.name: i.id for i in api.enum_images()}
info = api.templateInfo(templateId).xml
info = api.template_info(template_id).xml
template: typing.Any = minidom.parseString(info).getElementsByTagName('TEMPLATE')[0] # pyright: ignore
logger.debug('XML: %s', template.toxml())
for dsk in template.getElementsByTagName('DISK'):
imgIds = dsk.getElementsByTagName('IMAGE_ID')
if not imgIds:
images_ids = dsk.getElementsByTagName('IMAGE_ID')
if not images_ids:
try:
node = dsk.getElementsByTagName('IMAGE')[0].childNodes[0]
except IndexError:
continue
imgId = imgs[node.data]
image_id = imgs[node.data]
else:
node = imgIds[0].childNodes[0]
imgId = node.data
node = images_ids[0].childNodes[0]
image_id = node.data
logger.debug('Found %s for cloning', imgId)
logger.debug('Found %s for cloning', image_id)
# Now delete the image
api.deleteImage(imgId) # api.call('image.delete', int(imgId))
api.delete_image(image_id) # api.call('image.delete', int(imgId))
except Exception:
logger.exception('Removing image')
api.deleteTemplate(templateId) # api.call('template.delete', int(templateId))
api.delete_template(template_id) # api.call('template.delete', int(templateId))
except Exception:
logger.error('Removing template on OpenNebula')
def deployFrom(api: 'client.OpenNebulaClient', templateId: str, name: str) -> str:
def deploy_from(api: 'client.OpenNebulaClient', template_id: str, name: str) -> str:
"""
Deploys a virtual machine on selected cluster from selected template
@ -192,10 +192,10 @@ def deployFrom(api: 'client.OpenNebulaClient', templateId: str, name: str) -> st
Returns:
Id of the machine being created form template
"""
vmId = api.instantiateTemplate(
templateId, name, False, '', False
vmid = api.instantiate_template(
template_id, name, False, '', False
) # api.call('template.instantiate', int(templateId), name, False, '')
return vmId
return vmid
def check_published(api: 'client.OpenNebulaClient', template_id: str) -> bool:
@ -203,27 +203,27 @@ def check_published(api: 'client.OpenNebulaClient', template_id: str) -> bool:
checks if the template is fully published (images are ready...)
"""
try:
imgs = {i.name: i.id for i in api.enumImages()}
images = {i.name: i.id for i in api.enum_images()}
info = api.templateInfo(template_id).xml
info = api.template_info(template_id).xml
template: typing.Any = minidom.parseString(info).getElementsByTagName('TEMPLATE')[0] # pyright: ignore
logger.debug('XML: %s', template.toxml())
for dsk in template.getElementsByTagName('DISK'):
imgIds = dsk.getElementsByTagName('IMAGE_ID')
if not imgIds:
images_ids = dsk.getElementsByTagName('IMAGE_ID')
if not images_ids:
try:
node = dsk.getElementsByTagName('IMAGE')[0].childNodes[0]
except IndexError:
continue
imgId = imgs[node.data]
image_id = images[node.data]
else:
node = imgIds[0].childNodes[0]
imgId = node.data
node = images_ids[0].childNodes[0]
image_id = node.data
logger.debug('Found %s for checking', imgId)
logger.debug('Found %s for checking', image_id)
state = api.image_info(imgId).state
state = api.image_info(image_id).state
if state in (types.ImageState.INIT, types.ImageState.LOCKED):
return False
if state != types.ImageState.READY: # If error is not READY
@ -232,7 +232,7 @@ def check_published(api: 'client.OpenNebulaClient', template_id: str) -> bool:
)
# Ensure image is non persistent. This may be invoked more than once, but it does not matters
api.makePersistentImage(imgId, False)
api.make_persistent_image(image_id, False)
except Exception:
logger.exception('Exception checking published')

View File

@ -141,7 +141,7 @@ def shutdown_machine(api: 'client.OpenNebulaClient', vmid: str) -> None:
logger.error('Error shutting down %s on OpenNebula: %s', vmid, e)
def reset_machine(api: 'client.OpenNebulaClient', machineId: str) -> None:
def reset_machine(api: 'client.OpenNebulaClient', vmid: str) -> None:
'''
Tries to suspend a machine. No check is done, it is simply requested to OpenNebula
@ -151,12 +151,12 @@ def reset_machine(api: 'client.OpenNebulaClient', machineId: str) -> None:
Returns:
'''
try:
api.set_machine_state(machineId, 'reboot-hard')
api.set_machine_state(vmid, 'reboot-hard')
except Exception as e:
logger.error('Error reseting %s on OpenNebula: %s', machineId, e)
logger.error('Error reseting %s on OpenNebula: %s', vmid, e)
def remove_machine(api: 'client.OpenNebulaClient', machineId: str) -> None:
def remove_machine(api: 'client.OpenNebulaClient', vmid: str) -> None:
'''
Tries to delete a machine. No check is done, it is simply requested to OpenNebula
@ -168,9 +168,9 @@ def remove_machine(api: 'client.OpenNebulaClient', machineId: str) -> None:
try:
# vm = oca.VirtualMachine.new_with_id(api, int(machineId))
# vm.delete()
api.remove_machine(machineId)
api.remove_machine(vmid)
except Exception as e:
err = 'Error removing machine {} on OpenNebula: {}'.format(machineId, e)
err = 'Error removing machine {} on OpenNebula: {}'.format(vmid, e)
logger.exception(err)
raise Exception(err)
@ -197,19 +197,19 @@ def enumerate_machines(
def get_network_info(
api: 'client.OpenNebulaClient',
vmid: str,
networkId: typing.Optional[str] = None,
network_id: typing.Optional[str] = None,
) -> tuple[str, str]:
'''
Get the MAC and the IP for the network and machine. If network is None, for the first network
'''
# md = minidom.parseString(api.call('vm.info', int(machineId)))
md: typing.Any = minidom.parseString(api.VMInfo(vmid).xml or '') # pyright: ignore[reportUnknownMemberType]
md: typing.Any = minidom.parseString(api.vm_info(vmid).xml or '') # pyright: ignore[reportUnknownMemberType]
node = md
try:
for nic in md.getElementsByTagName('NIC'):
netId = nic.getElementsByTagName('NETWORK_ID')[0].childNodes[0].data
if networkId is None or int(netId) == int(networkId):
net_id = nic.getElementsByTagName('NETWORK_ID')[0].childNodes[0].data
if network_id is None or int(net_id) == int(network_id):
node = nic
break
except Exception:
@ -230,13 +230,13 @@ def get_network_info(
def get_console_connection(
api: 'client.OpenNebulaClient', machineId: str
api: 'client.OpenNebulaClient', vmid: str
) -> typing.Optional[core_types.services.ConsoleConnectionInfo]:
'''
If machine is not running or there is not a display, will return NONE
SPICE connections should check that 'type' is 'SPICE'
'''
md: typing.Any = minidom.parseString(api.VMInfo(machineId).xml or '') # pyright: ignore[reportUnknownMemberType]
md: typing.Any = minidom.parseString(api.vm_info(vmid).xml or '') # pyright: ignore[reportUnknownMemberType]
try:
graphics = md.getElementsByTagName('GRAPHICS')[0]
@ -247,8 +247,8 @@ def get_console_connection(
except Exception:
passwd = ''
lastChild: typing.Any = md.getElementsByTagName('HISTORY_RECORDS')[0].lastChild
address = lastChild.getElementsByTagName('HOSTNAME')[0].childNodes[0].data if lastChild else ''
last_child: typing.Any = md.getElementsByTagName('HISTORY_RECORDS')[0].lastChild
address = last_child.getElementsByTagName('HOSTNAME')[0].childNodes[0].data if last_child else ''
return core_types.services.ConsoleConnectionInfo(
type=type_,

View File

@ -168,7 +168,7 @@ class OpenNebulaProvider(ServiceProvider): # pylint: disable=too-many-public-me
yield from on.storage.enumerateDatastores(self.api, datastoreType)
def getTemplates(self, force: bool = False) -> collections.abc.Iterable[on.types.TemplateType]:
yield from on.template.getTemplates(self.api, force)
yield from on.template.enumerate_templates(self.api, force)
def make_template(self, from_template_id: str, name: str, dest_storage: str) -> str:
return on.template.create(self.api, from_template_id, name, dest_storage)
@ -180,7 +180,7 @@ class OpenNebulaProvider(ServiceProvider): # pylint: disable=too-many-public-me
on.template.remove(self.api, templateId)
def deply_from_template(self, name: str, templateId: str) -> str:
return on.template.deployFrom(self.api, templateId, name)
return on.template.deploy_from(self.api, templateId, name)
def getMachineState(self, machineId: str) -> on.types.VmState:
'''