1
0
mirror of https://github.com/dkmstr/openuds.git synced 2025-01-08 21:18:00 +03:00

Done oVirt engine connector. Now the hard part, test it and fix it (it isn't already usable!!!)

This commit is contained in:
Adolfo Gómez 2012-11-22 10:28:52 +00:00
parent 400ac892f0
commit 0eea8a33b5
5 changed files with 450 additions and 158 deletions

View File

@ -134,6 +134,7 @@ encoding//src/uds/services/Vmware/client/Exceptions.py=utf-8
encoding//src/uds/services/Vmware/client/Server.py=utf-8
encoding//src/uds/services/Vmware/client/Task.py=utf-8
encoding//src/uds/services/Vmware/client/ws/VimService_types.py=utf-8
encoding//src/uds/services/Vmware_enterprise/Helpers.py=utf-8
encoding//src/uds/services/Vmware_enterprise/PublicationVC.py=utf-8
encoding//src/uds/services/Vmware_enterprise/ServiceProviderVC.py=utf-8
encoding//src/uds/services/Vmware_enterprise/VCLinkedCloneDeployed.py=utf-8

View File

@ -33,11 +33,12 @@
from uds.core.services import UserDeployment
from uds.core.util.State import State
import cPickle
import logging
logger = logging.getLogger(__name__)
opCreate, opStart, opStop, opSuspend, opWait, opError, opFinish, opRetry = range(8)
opCreate, opStart, opStop, opSuspend, opRemove, opWait, opError, opFinish, opRetry = range(9)
class OVirtLinkedDeployment(UserDeployment):
'''
@ -57,24 +58,27 @@ class OVirtLinkedDeployment(UserDeployment):
def initialize(self):
self._name = ''
self._ip = ''
self._mac = ''
self._vmid = ''
self._reason = ''
self._queue = []
self._destroyAfter = 'f'
# Serializable needed methods
def marshal(self):
'''
Does nothing right here, we will use envoronment storage in this sample
'''
return ''
return '\1'.join( ['v1', self._name, self._ip, self._mac, self._vmid, self._reason, cPickle.dumps(self._queue)] )
def unmarshal(self, str_):
'''
Does nothing here also, all data are keeped at environment storage
'''
pass
logger.debug('Data: {0}'.format(str_))
vals = str_.split('\1')
if vals[0] == 'v1':
self._name, self._ip, self._mac, self._vmid, self._reason, queue = vals[1:]
self._queue = cPickle.loads(queue)
def getName(self):
'''
@ -165,7 +169,8 @@ class OVirtLinkedDeployment(UserDeployment):
return self.__error('Machine is not available anymore')
if state not in ('up', 'powering_up', 'restoring_state'):
return self.__powerOn()
self._queue = [ opStart, opFinish ]
return self.__executeQueue()
self.cache().put('ready', '1')
return State.FINISHED
@ -177,30 +182,39 @@ class OVirtLinkedDeployment(UserDeployment):
logger.debug('Machine is ready. Moving to level 2')
self.__popCurrentOp() # Remove current state
return self.__executeQueue()
#if self._squeue.getCurrent() == stWaitReady:
# logger.debug('Move to level 2, suspending machine')
# return self.moveToCache(self.L2_CACHE)
# Do not need to go to level 2 (opWait is in fact "waiting for moving machine to cache level 2)
return State.FINISHED
def deployForUser(self, user):
'''
Deploys an service instance for an user.
'''
logger.debug('Deploying for user')
self.__initQueueForDeploy(False)
return self.__executeQueue()
def __executeQueue(self):
op = self.__getCurrentOp()
if op == opError:
return State.ERROR
if op == opFinish:
return State.FINISHED
if op == opCreate:
return self.__create()
def deployForCache(self, cacheLevel):
'''
Deploys an service instance for cache
'''
self.__initQueueForDeploy(cacheLevel == self.L2_CACHE)
return self.__executeQueue()
def __initQueueForDeploy(self, forLevel2 = False):
if forLevel2 is False:
self._queue = [opCreate, opStart, opFinish]
else:
self._queue = [opCreate, opStart, opWait, opSuspend, opFinish]
def __checkMachineState(self, chkState):
logger.debug('Checking that state of machine {0} is {1}'.format(self._vmid, chkState))
state = self.service().getMachineState(self._vmid)
if state != chkState:
return State.RUNNING
return State.FINISHED
def __getCurrentOp(self):
if len(self._queue) == 0:
@ -228,10 +242,43 @@ class OVirtLinkedDeployment(UserDeployment):
Returns:
State.ERROR, so we can do "return self.__error(reason)"
'''
logger.debug('Setting error state, reason: {0}'.format(reason))
self._queue = [opError]
self._reason = str(reason)
return State.ERROR
def __executeQueue(self):
self.__debugQueue('executeQueue')
op = self.__getCurrentOp()
if op == opError:
return State.ERROR
if op == opFinish:
return State.FINISHED
fncs = { opCreate: self.__create,
opRetry: self.__retry,
opStart: self.__startMachine,
opStop: self.__stopMachine,
opSuspend: self.__suspendMachine,
opWait: self.__wait,
opRemove: self.__remove
}
try:
execFnc = fncs.get(op, None)
if execFnc is None:
return self.__error('Unknown operation found at execution queue ({0})'.format(op))
state = execFnc()
return state
except Exception as e:
return self.__error(e)
# Queue execution methods
def __retry(self):
'''
@ -241,6 +288,12 @@ class OVirtLinkedDeployment(UserDeployment):
'''
return State.FINISHED
def __wait(self):
'''
Executes opWait, it simply waits something "external" to end
'''
return State.RUNNING
def __create(self):
'''
Deploys a machine from template for user/cache
@ -249,56 +302,106 @@ class OVirtLinkedDeployment(UserDeployment):
name = self.service().sanitizeVmName('UDS service ' + self.getName())
comments = 'UDS Linked clone for'
try:
self._vmid = self.service().deployFromTemplate(name, comments, templateId)
if self._vmid is None:
raise Exception('Can\'t create machine')
except Exception as e:
return self.__error(e)
self._vmid = self.service().deployFromTemplate(name, comments, templateId)
if self._vmid is None:
raise Exception('Can\'t create machine')
return State.RUNNING
def __powerOn(self):
def __remove(self):
'''
Removes a machine from system
'''
state = self.service().getMachineState(self._vmid)
if state != 'down' and state != 'suspended':
self.__pushFrontOp(opStop)
return State.RUNNING
self.service().removeMachine(self._vmid)
return State.RUNNING
def __startMachine(self):
'''
Powers on the machine
'''
state = self.service.getMachineState(self._vmid)
if state == 'down':
pass
state = self.service().getMachineState(self._vmid)
if state == 'up':
return State.FINISHED
def deployForUser(self, user):
if state != 'down':
self.__pushFrontOp(opRetry) # Remember here, the return State.FINISH will make this retry be "poped" right ar return
return State.FINISHED
self.service().startMachine(self._vmid)
return State.RUNNING
def __stopMachine(self):
'''
Deploys an service instance for an user.
Powers off the machine
'''
self.__initQueueForDeploy(False)
return self.__executeQueue()
state = self.service().getMachineState(self._vmid)
if state == 'down':
return State.FINISHED
if state != 'up':
self.__pushBackOp(opRetry) # Remember here, the return State.FINISH will make this retry be "poped" right ar return
return State.FINISHED
self.service().stopMachine(self._vmid)
return State.RUNNING
def deployForCache(self, cacheLevel):
def __suspendMachine(self):
'''
Deploys an service instance for cache
Suspends the machine
'''
forLevel2 = cacheLevel == self.L2_CACHE
self.__initQueueForDeploy(forLevel2)
return self.__executeQueue()
def __checkDeploy(self):
state = self.service().getMachineState(self._vmid)
if state == 'suspended':
return State.FINISHED
if state != 'up':
self.__pushBackOp(opRetry) # Remember here, the return State.FINISH will make this retry be "poped" right ar return
return State.FINISHED
self.service().suspendMachine(self._vmid)
return State.RUNNING
# Check methods
def __checkCreate(self):
'''
Checks the state of a deploy for an user or cache
'''
try:
state = self.service().getMachineState(self._vmid)
if state != 'down':
return State.RUNNING
except Exception as e:
return self.__error(e)
return State.FINISHED
return self.__checkMachineState('down')
def __checkStart(self):
'''
Checks if machine has started
'''
return self.__checkMachineState('up')
def __checkStop(self):
'''
Checks if machine has stoped
'''
return self.__checkMachineState('down')
def __checkSuspend(self):
'''
Check if the machine has suspended
'''
return self.__checkMachineState('suspended')
def __checkRemoved(self):
'''
Checks if a machine has been removed
'''
return self.__checkMachineState('unknown')
def checkState(self):
'''
Check what operation is going on, and acts acordly to it
'''
self.__debugQueue('checkState')
op = self.__getCurrentOp()
if op == opError:
@ -307,56 +410,43 @@ class OVirtLinkedDeployment(UserDeployment):
if op == opFinish:
return State.FINISHED
res = None
fncs = { opCreate: self.__checkCreate,
opRetry: self.__retry,
opWait: self.__wait,
opStart: self.__checkStart,
opStop: self.__checkStop,
opSuspend: self.__checkSuspend,
opRemove: self.__checkRemoved
}
if op == opCreate:
res = self.__checkDeploy()
if op == opStart:
res = self.__checkPowerOn()
try:
chkFnc = fncs.get(op, None)
if op == opStop:
res = self.__checkPowerOff()
if chkFnc is None:
return self.__error('Unknown operation found at check queue ({0})'.format(op))
if op == opWait:
res = State.RUNNING
state = chkFnc()
if state == State.FINISHED:
self.__popCurrentOp() # Remove runing op
return self.__executeQueue()
if op == opSuspend:
res = self.__checkSuspend()
if res is None:
return self.__error('Unexpected operation found')
if res == State.FINISHED:
self.__popCurrentOp()
return State.RUNNING
return res
return state
except Exception as e:
return self.__error(e)
def finish(self):
'''
Invoked when the core notices that the deployment of a service has finished.
(No matter wether it is for cache or for an user)
This gives the oportunity to make something at that moment.
:note: You can also make these operations at checkState, this is really
not needed, but can be provided (default implementation of base class does
nothing)
'''
# Note that this is not really needed, is just a sample of storage use
self.storage().remove('count')
pass
def assignToUser(self, user):
'''
This method is invoked whenever a cache item gets assigned to an user.
This gives the User Deployment an oportunity to do whatever actions
are required so the service puts at a correct state for using by a service.
In our sample, the service is always ready, so this does nothing.
This is not a task method. All level 1 cache items can be diretly
assigned to an user with no more work needed, but, if something is needed,
here you can do whatever you need
'''
pass
@ -364,35 +454,20 @@ class OVirtLinkedDeployment(UserDeployment):
'''
This method must be available so os managers can invoke it whenever
an user get logged into a service.
Default implementation does nothing, so if you are going to do nothing,
you don't need to implement it.
The responability of notifying it is of os manager actor, and it's
directly invoked by os managers (right now, linux os manager and windows
os manager)
The user provided is just an string, that is provided by actor.
'''
# We store the value at storage, but never get used, just an example
self.storage().saveData('user', user)
pass
def userLoggedOut(self, user):
'''
This method must be available so os managers can invoke it whenever
an user get logged out if a service.
Default implementation does nothing, so if you are going to do nothing,
you don't need to implement it.
The responability of notifying it is of os manager actor, and it's
directly invoked by os managers (right now, linux os manager and windows
os manager)
The user provided is just an string, that is provided by actor.
'''
# We do nothing more that remove the user
self.storage().remove('user')
pass
def reasonOfError(self):
'''
@ -402,18 +477,24 @@ class OVirtLinkedDeployment(UserDeployment):
for it, and it will be asked everytime it's needed to be shown to the
user (when the administation asks for it).
'''
return self.storage().readData('error') or 'No error'
return self._reason
def destroy(self):
'''
This is a task method. As that, the excepted return values are
State values RUNNING, FINISHED or ERROR.
Invoked for destroying a deployed service
Do whatever needed here, as deleting associated data if needed (i.e. a copy of the machine, snapshots, etc...)
@return: State.FINISHED if no more checks/steps for deployment are needed, State.RUNNING if more steps are needed (steps checked using checkState)
'''
return State.FINISHED
# If executing something, wait until finished to remove it
# We simply replace the execution queue
op = self.__getCurrentOp()
if op == opFinish or op == opWait:
self._queue = [opStop, opRemove]
return self.__executeQueue()
self._queue = [op, opStop, opRemove]
# Do not execute anything.here, just continue normally
return State.RUNNING
def cancel(self):
'''
@ -425,5 +506,22 @@ class OVirtLinkedDeployment(UserDeployment):
When administrator requests it, the cancel is "delayed" and not
invoked directly.
'''
return State.FINISHED
return self.destroy()
@staticmethod
def __op2str(op):
return { opCreate: 'create',
opStart: 'start',
opStop: 'stop',
opSuspend: 'suspend',
opRemove: 'remove',
opWait: 'wait',
opError: 'error',
opFinish: 'finish',
opRetry: 'retry'
}.get(op, '????')
def __debugQueue(self, txt):
logger.debug('Queue at {0}: {1}'.format(txt,[OVirtLinkedDeployment.__op2str(op) for op in self._queue ]))

View File

@ -77,7 +77,7 @@ class OVirtLinkedService(Service):
cacheTooltip = translatable('Number of desired machines to keep running waiting for a user')
#: If we need to generate a "Level 2" cache for this service (i.e., L1
#: could be running machines and L2 suspended machines)
usesCache_L2 = False
usesCache_L2 = True
#: Tooltip shown to user when this item is pointed at admin interface, None
#: also because we don't use it
cacheTooltip_L2 = translatable('Number of desired machines to keep suspended waiting for use')
@ -109,7 +109,7 @@ class OVirtLinkedService(Service):
datastore = gui.ChoiceField(label = translatable("Datastore Domain"), rdonly = False, order = 3,
tooltip = translatable('Datastore domain where to publish and put incrementals'), required = True)
baseName = gui.TextField(label = translatable('Machine Names'), rdonly = False, order = 4, tooltip = ('Base name for clones from this machine'), required = True)
lenName = gui.NumericField(length = 1, label = translatable('Name Length'), defvalue = 3, order = 5,
lenName = gui.NumericField(length = 1, label = translatable('Name Length'), defvalue = 5, order = 5,
tooltip = translatable('Length of numeric part for the names of this machines (betwen 3 and 6'), required = True)
ov = gui.HiddenField()
@ -197,6 +197,12 @@ class OVirtLinkedService(Service):
Id of the machine being created form template
'''
return self.parent().deployFromTemplate(name, comments, templateId, self.cluster.value)
def removeTemplate(self, templateId):
'''
invokes removeTemplate from parent provider
'''
return self.parent().removeTemplate(templateId)
def getMachineState(self, machineId):
'''
@ -207,23 +213,77 @@ class OVirtLinkedService(Service):
machineId: If of the machine to get state
Returns:
'down': Machine is not running
'unknown': Machine is not known
'powering_up': Machine is powering up
'up': Machine is up and running
'saving_state': Machine is "suspending"
'suspended': Machine is suspended
'restoring_state': Machine is restoring state (unsuspending)
'powering_down': Machine is powering down
'image_locked': Machine is creating/cloning and is not usable
one of this values:
unassigned, down, up, powering_up, powered_down,
paused, migrating_from, migrating_to, unknown, not_responding,
wait_for_launch, reboot_in_progress, saving_state, restoring_state,
suspended, image_illegal, image_locked or powering_down
Also can return'unknown' if Machine is not known
'''
return self.parent().getMachineState(machineId)
def startMachine(self, machineId):
'''
Tries to start a machine. No check is done, it is simply requested to oVirt.
This start also "resume" suspended/paused machines
Args:
machineId: Id of the machine
Returns:
'''
return self.parent().startMachine(machineId)
def removeTemplate(self, templateId):
def stopMachine(self, machineId):
'''
invokes removeTemplate from parent provider
Tries to start a machine. No check is done, it is simply requested to oVirt
Args:
machineId: Id of the machine
Returns:
'''
return self.parent().removeTemplate(templateId)
return self.parent().stopMachine(machineId)
def suspendMachine(self, machineId):
'''
Tries to start a machine. No check is done, it is simply requested to oVirt
Args:
machineId: Id of the machine
Returns:
'''
return self.parent().suspendMachine(machineId)
def removeMachine(self, machineId):
'''
Tries to delete a machine. No check is done, it is simply requested to oVirt
Args:
machineId: Id of the machine
Returns:
'''
return self.parent().removeMachine(machineId)
def getMacRange(self):
'''
Returns de selected mac range
'''
return self.parent().getMacRange()
def getBaseName(self):
'''
Returns the base name
'''
return self.baseName.value
def getLenName(self):
'''
Returns the length of numbers part
'''
return int(self.lenName.value)

View File

@ -271,15 +271,12 @@ class Provider(ServiceProvider):
machineId: Id of the machine to get state
Returns:
'down': Machine is not running
'unknown': Machine is not known
'powering_up': Machine is powering up
'up': Machine is up and running
'saving_state': Machine is "suspending"
'suspended': Machine is suspended
'restoring_state': Machine is restoring state (unsuspending)
'powering_down': Machine is powering down
one of this values:
unassigned, down, up, powering_up, powered_down,
paused, migrating_from, migrating_to, unknown, not_responding,
wait_for_launch, reboot_in_progress, saving_state, restoring_state,
suspended, image_illegal, image_locked or powering_down
Also can return'unknown' if Machine is not known
'''
return self.__getApi().getMachineState(machineId)
@ -308,6 +305,52 @@ class Provider(ServiceProvider):
'''
return self.__getApi().deployFromTemplate(name, comments, templateId, clusterId)
def startMachine(self, machineId):
'''
Tries to start a machine. No check is done, it is simply requested to oVirt.
This start also "resume" suspended/paused machines
Args:
machineId: Id of the machine
Returns:
'''
return self.__getApi().startMachine(machineId)
def stopMachine(self, machineId):
'''
Tries to start a machine. No check is done, it is simply requested to oVirt
Args:
machineId: Id of the machine
Returns:
'''
return self.__getApi().stopMachine(machineId)
def suspendMachine(self, machineId):
'''
Tries to start a machine. No check is done, it is simply requested to oVirt
Args:
machineId: Id of the machine
Returns:
'''
return self.__getApi().suspendMachine(machineId)
def removeMachine(self, machineId):
'''
Tries to delete a machine. No check is done, it is simply requested to oVirt
Args:
machineId: Id of the machine
Returns:
'''
return self.__getApi().removeMachine(machineId)
def getMacRange(self):
return self.macsRange.value

View File

@ -9,6 +9,7 @@ from ovirtsdk.api import API
import threading
import logging
import ovirtsdk
logger = logging.getLogger(__name__)
@ -316,21 +317,21 @@ class Client(object):
lock.release()
def makeTemplate(self, name, comments, vmId, clusterId, storageId):
def makeTemplate(self, name, comments, machineId, clusterId, storageId):
'''
Publish the machine (makes a template from it so we can create COWs) and returns the template id of
the creating machine
Args:
name: Name of the machine (care, only ascii characters and no spaces!!!)
vmId: id of the machine to be published
machineId: id of the machine to be published
clusterId: id of the cluster that will hold the machine
storageId: id of the storage tuat will contain the publication AND linked clones
Returns
Raises an exception if operation could not be acomplished, or returns the id of the template being created.
'''
print "n: {0}, c: {1}, vm: {2}, cl: {3}, st: {3}".format(name, comments, vmId, clusterId, storageId)
print "n: {0}, c: {1}, vm: {2}, cl: {3}, st: {3}".format(name, comments, machineId, clusterId, storageId)
try:
lock.acquire(True)
@ -341,11 +342,18 @@ class Client(object):
storage_domain = params.StorageDomain(id=storageId)
cluster = api.clusters.get(id=clusterId)
vm = api.vms.get(id=vmId)
vm = api.vms.get(id=machineId)
if vm is None:
raise Exception('Machine not found')
if cluster is None:
raise Exception('Cluster not found')
if vm.get_status().get_state() != 'down':
raise Exception('Machine must be in down state to publish it')
template = params.Template(name=name,storage_domain=storage_domain, vm=vm, cluster=cluster, description=comments)
return api.templates.add(template).get_id()
@ -400,8 +408,8 @@ class Client(object):
api = self.__getApi()
cluster = api.clusters.get(id=clusterId)
template = api.templates.get(id=templateId)
cluster = params.Cluster(id=clusterId)
template = params.Template(id=templateId)
if cluster is None:
raise Exception('Cluster not found')
@ -437,7 +445,6 @@ class Client(object):
# This returns nothing, if it fails it raises an exception
finally:
lock.release()
def getMachineState(self, machineId):
'''
@ -448,15 +455,12 @@ class Client(object):
machineId: Id of the machine to get status
Returns:
'down': Machine is not running
'unknown': Machine is not known
'powering_up': Machine is powering up
'up': Machine is up and running
'saving_state': Machine is "suspending"
'suspended': Machine is suspended
'restoring_state': Machine is restoring state (unsuspending)
'powering_down': Machine is powering down
one of this values:
unassigned, down, up, powering_up, powered_down,
paused, migrating_from, migrating_to, unknown, not_responding,
wait_for_launch, reboot_in_progress, saving_state, restoring_state,
suspended, image_illegal, image_locked or powering_down
Also can return'unknown' if Machine is not known
'''
try:
lock.acquire(True)
@ -472,16 +476,102 @@ class Client(object):
finally:
lock.release()
def powerOnMachine(self, machineId):
def startMachine(self, machineId):
'''
Tries to power on a machine. No check is done, it is simply requested to oVirt
Tries to start a machine. No check is done, it is simply requested to oVirt.
This start also "resume" suspended/paused machines
Args:
machineId: Id of the machine
Returns:
'''
try:
lock.acquire(True)
api = self.__getApi()
vm = api.vms.get(id=machineId)
if vm is None:
raise Exception('Machine not found')
vm.start()
finally:
lock.release()
def stopMachine(self, machineId):
'''
Tries to start a machine. No check is done, it is simply requested to oVirt
Args:
machineId: Id of the machine
Returns:
'''
try:
lock.acquire(True)
api = self.__getApi()
vm = api.vms.get(id=machineId)
if vm is None:
raise Exception('Machine not found')
vm.stop()
finally:
lock.release()
def suspendMachine(self, machineId):
'''
Tries to start a machine. No check is done, it is simply requested to oVirt
Args:
machineId: Id of the machine
Returns:
'''
try:
lock.acquire(True)
api = self.__getApi()
vm = api.vms.get(id=machineId)
if vm is None:
raise Exception('Machine not found')
vm.suspend()
finally:
lock.release()
def removeMachine(self, machineId):
'''
Tries to delete a machine. No check is done, it is simply requested to oVirt
Args:
machineId: Id of the machine
Returns:
'''
try:
lock.acquire(True)
api = self.__getApi()
vm = api.vms.get(id=machineId)
if vm is None:
raise Exception('Machine not found')
vm.delete()
finally:
lock.release()