Keep solving some issues with new transaction.atomic

This commit is contained in:
Adolfo Gómez 2013-12-12 20:59:09 +00:00
parent dbc001ea22
commit 3cae37ca5c
7 changed files with 26 additions and 33 deletions

View File

@ -101,23 +101,24 @@ class Scheduler(object):
'''
jobInstance = None
try:
now = getSqlDatetime() # Datetimes are based on database server times
filter = Q(state = State.FOR_EXECUTE) & (Q(owner_server = self._hostname) | Q(owner_server = '')) & (Q(last_execution__gt = now) | Q(next_execution__lt = now))
with transaction.atomic():
now = getSqlDatetime() # Datetimes are based on database server times
filter = Q(state = State.FOR_EXECUTE) & (Q(owner_server = self._hostname) | Q(owner_server = '')) & (Q(last_execution__gt = now) | Q(next_execution__lt = now))
# If next execution is before now or last execution is in the future (clock changed on this server, we take that task as executable)
# This params are all set inside filter (look at __init__)
job = dbScheduler.objects.select_for_update().filter(filter).order_by('next_execution')[0]
jobInstance = job.getInstance()
if jobInstance == None:
logger.error('Job instance can\'t be resolved for {0}, removing it'.format(job))
job.delete()
return
logger.debug('Executing job:>{0}<'.format(job.name))
job.state = State.RUNNING
job.owner_server = self._hostname
job.last_execution = now
job.save()
jobInstance = job.getInstance()
if jobInstance == None:
logger.error('Job instance can\'t be resolved for {0}, removing it'.format(job))
job.delete()
return
logger.debug('Executing job:>{0}<'.format(job.name))
JobThread(jobInstance, job).start() # Do not instatiate thread, just run it
except IndexError:
# Do nothing, there is no jobs for execution

View File

@ -216,7 +216,6 @@ class UserServiceManager(object):
state_date=now, creation_date=now, data = '', deployed_service = deployedServicePublication.deployed_service,
user = None, in_use = False )
@transaction.atomic
def __createAssignedAtDb(self, deployedServicePublication, user):
'''
Private method to instatiate an assigned element at database with default state
@ -238,7 +237,6 @@ class UserServiceManager(object):
state_date=now, creation_date=now, data='', publication=None, user=user, in_use=False)
@transaction.atomic
def createCacheFor(self, deployedServicePublication, cacheLevel):
'''
Creates a new cache for the deployed service publication at level indicated
@ -270,7 +268,6 @@ class UserServiceManager(object):
return assigned
@transaction.atomic
def createAssignable(self, ds, deployed, user):
'''
Creates an assignable service
@ -304,7 +301,6 @@ class UserServiceManager(object):
cache.setState(State.PREPARING)
UserServiceOpChecker.makeUnique(cache, ci, state)
transaction.commit()
@transaction.atomic
def cancel(self, uService):

View File

@ -51,15 +51,16 @@ class Storage(object):
h.update(str(key))
return h.hexdigest()
@transaction.atomic
def saveData(self, skey, data, attr1 = None):
key = self.__getKey(skey)
data = data.encode(Storage.CODEC)
attr1 = '' if attr1 == None else attr1
try:
dbStorage.objects.create(owner = self._owner, key = key, data = data, attr1 = attr1 )
with transaction.atomic():
dbStorage.objects.create(owner = self._owner, key = key, data = data, attr1 = attr1 )
except Exception:
dbStorage.objects.filter(key=key).update(owner = self._owner, data = data, attr1 = attr1)
with transaction.atomic():
dbStorage.objects.filter(key=key).update(owner = self._owner, data = data, attr1 = attr1)
logger.debug('Key saved')
def put(self, skey, data):

View File

@ -176,7 +176,7 @@ class ClusterMigrationTask(DelayedTask):
logger.error('Can\'t update state of uService object')
class ClusterBalancingTask(DelayedTask):
def __init(self, providerId):
def __init__(self, providerId):
super(ClusterBalancingTask, self).__init__()
self._id = providerId
@ -240,7 +240,7 @@ class ClusterBalancingJob(Job):
frecuency = 90
friendly_name = 'Clustered Providers Balancing job'
def __init(self, environment):
def __init__(self, environment):
super(ClusterBalancingJob,self).__init__(environment)
def run(self):

View File

@ -69,8 +69,8 @@ class ServiceCacheUpdater(Job):
def __notifyRestrain(deployedService):
log.doLog(deployedService, log.WARN, 'Deployed service is restrained due to errors', log.INTERNAL)
logger.info('Deployed service {0} is restrained, will check this later'.format(deployedService.name))
@transaction.atomic
@transaction.atomic
def bestDeployedServiceNeedingCacheUpdate(self):
# State filter for cached and inAssigned objects
# First we get all deployed services that could need cache generation
@ -168,7 +168,6 @@ class ServiceCacheUpdater(Job):
# We also return calculated values so we can reuse then
return selected, cachedL1, cachedL2, assigned
@transaction.atomic
def growL1Cache(self, ds, cacheL1, cacheL2, assigned):
'''
This method tries to enlarge L1 cache.
@ -181,14 +180,15 @@ class ServiceCacheUpdater(Job):
# First, we try to assign from L2 cache
if cacheL2 > 0:
valid = None
for n in ds.cachedUserServices().select_for_update().filter(UserServiceManager.getCacheStateFilter(services.UserDeployment.L2_CACHE)).order_by('creation_date'):
if n.needsOsManager():
if State.isUsable(n.state) is False or State.isUsable(n.os_state):
with transaction.atomic():
for n in ds.cachedUserServices().select_for_update().filter(UserServiceManager.getCacheStateFilter(services.UserDeployment.L2_CACHE)).order_by('creation_date'):
if n.needsOsManager():
if State.isUsable(n.state) is False or State.isUsable(n.os_state):
valid = n
break
else:
valid = n
break
else:
valid = n
break
if valid is not None:
valid.moveToLevel(services.UserDeployment.L1_CACHE)
@ -201,7 +201,6 @@ class ServiceCacheUpdater(Job):
except:
logger.exception('Exception')
@transaction.atomic
def growL2Cache(self, ds, cacheL1, cacheL2, assigned):
'''
Tries to grow L2 cache of service.
@ -217,7 +216,6 @@ class ServiceCacheUpdater(Job):
logger.error(str(e))
# TODO: When alerts are ready, notify this
@transaction.atomic
def reduceL1Cache(self, ds, cacheL1, cacheL2, assigned):
logger.debug("Reducing L1 cache erasing a service in cache for {0}".format(ds))
# We will try to destroy the newest cacheL1 element that is USABLE if the deployer can't cancel a new service creation
@ -244,7 +242,6 @@ class ServiceCacheUpdater(Job):
cache = cacheItems[0]
cache.removeOrCancel()
@transaction.atomic
def reduceL2Cache(self, ds, cacheL1, cacheL2, assigned):
logger.debug("Reducing L2 cache erasing a service in cache for {0}".format(ds))
if cacheL2 > 0:

View File

@ -56,7 +56,6 @@ class UserServiceInfoItemsCleaner(Job):
def __init__(self, environment):
super(UserServiceInfoItemsCleaner,self).__init__(environment)
@transaction.atomic
def run(self):
removeFrom = getSqlDatetime() - timedelta(seconds = GlobalConfig.KEEP_INFO_TIME.getInt(True))
logger.debug('Removing information user services from {0}'.format(removeFrom))
@ -72,7 +71,6 @@ class UserServiceRemover(Job):
def __init__(self, environment):
super(UserServiceRemover,self).__init__(environment)
@transaction.atomic
def run(self):
removeFrom = getSqlDatetime() - timedelta(seconds=10) # We keep at least 30 seconds the machine before removing it, so we avoid connections errors
removables = UserService.objects.filter(state=State.REMOVABLE, state_date__lt=removeFrom)[0:UserServiceRemover.removeAtOnce]

View File

@ -81,7 +81,7 @@ class OVirtPublication(Publication):
'''
Realizes the publication of the service
'''
self._name = self.service().sanitizeVmName('UDS Publication' + ' ' + self.dsName() + "-" + str(self.revision()))
self._name = self.service().sanitizeVmName('UDSP ' + self.dsName() + "-" + str(self.revision()))
comments = _('UDS pub for {0} at {1}').format( self.dsName(), str(datetime.now()).split('.')[0] )
self._reason = '' # No error, no reason for it
self._destroyAfter = 'f'