From 3cae37ca5c2c549576a09511f46fa25c2308b0e6 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Adolfo=20G=C3=B3mez?= Date: Thu, 12 Dec 2013 20:59:09 +0000 Subject: [PATCH] Keep solving some issues with new transaction.atomic --- server/src/uds/core/jobs/Scheduler.py | 19 +++++++++-------- .../uds/core/managers/UserServiceManager.py | 4 ---- server/src/uds/core/util/Storage.py | 7 ++++--- .../workers/ClusteredProviderManagement.py | 4 ++-- .../uds/core/workers/ServiceCacheUpdater.py | 21 ++++++++----------- .../uds/core/workers/UserServiceCleaner.py | 2 -- .../uds/services/OVirt/OVirtPublication.py | 2 +- 7 files changed, 26 insertions(+), 33 deletions(-) diff --git a/server/src/uds/core/jobs/Scheduler.py b/server/src/uds/core/jobs/Scheduler.py index b1942823..d62c1a6b 100644 --- a/server/src/uds/core/jobs/Scheduler.py +++ b/server/src/uds/core/jobs/Scheduler.py @@ -101,23 +101,24 @@ class Scheduler(object): ''' jobInstance = None try: + now = getSqlDatetime() # Datetimes are based on database server times + filter = Q(state = State.FOR_EXECUTE) & (Q(owner_server = self._hostname) | Q(owner_server = '')) & (Q(last_execution__gt = now) | Q(next_execution__lt = now)) with transaction.atomic(): - now = getSqlDatetime() # Datetimes are based on database server times - filter = Q(state = State.FOR_EXECUTE) & (Q(owner_server = self._hostname) | Q(owner_server = '')) & (Q(last_execution__gt = now) | Q(next_execution__lt = now)) # If next execution is before now or last execution is in the future (clock changed on this server, we take that task as executable) # This params are all set inside filter (look at __init__) job = dbScheduler.objects.select_for_update().filter(filter).order_by('next_execution')[0] - jobInstance = job.getInstance() - - if jobInstance == None: - logger.error('Job instance can\'t be resolved for {0}, removing it'.format(job)) - job.delete() - return - logger.debug('Executing job:>{0}<'.format(job.name)) job.state = State.RUNNING job.owner_server = self._hostname job.last_execution = now job.save() + + jobInstance = job.getInstance() + + if jobInstance == None: + logger.error('Job instance can\'t be resolved for {0}, removing it'.format(job)) + job.delete() + return + logger.debug('Executing job:>{0}<'.format(job.name)) JobThread(jobInstance, job).start() # Do not instatiate thread, just run it except IndexError: # Do nothing, there is no jobs for execution diff --git a/server/src/uds/core/managers/UserServiceManager.py b/server/src/uds/core/managers/UserServiceManager.py index f32cc384..117de1a0 100644 --- a/server/src/uds/core/managers/UserServiceManager.py +++ b/server/src/uds/core/managers/UserServiceManager.py @@ -216,7 +216,6 @@ class UserServiceManager(object): state_date=now, creation_date=now, data = '', deployed_service = deployedServicePublication.deployed_service, user = None, in_use = False ) - @transaction.atomic def __createAssignedAtDb(self, deployedServicePublication, user): ''' Private method to instatiate an assigned element at database with default state @@ -238,7 +237,6 @@ class UserServiceManager(object): state_date=now, creation_date=now, data='', publication=None, user=user, in_use=False) - @transaction.atomic def createCacheFor(self, deployedServicePublication, cacheLevel): ''' Creates a new cache for the deployed service publication at level indicated @@ -270,7 +268,6 @@ class UserServiceManager(object): return assigned - @transaction.atomic def createAssignable(self, ds, deployed, user): ''' Creates an assignable service @@ -304,7 +301,6 @@ class UserServiceManager(object): cache.setState(State.PREPARING) UserServiceOpChecker.makeUnique(cache, ci, state) - transaction.commit() @transaction.atomic def cancel(self, uService): diff --git a/server/src/uds/core/util/Storage.py b/server/src/uds/core/util/Storage.py index 1823812a..7baf7e13 100644 --- a/server/src/uds/core/util/Storage.py +++ b/server/src/uds/core/util/Storage.py @@ -51,15 +51,16 @@ class Storage(object): h.update(str(key)) return h.hexdigest() - @transaction.atomic def saveData(self, skey, data, attr1 = None): key = self.__getKey(skey) data = data.encode(Storage.CODEC) attr1 = '' if attr1 == None else attr1 try: - dbStorage.objects.create(owner = self._owner, key = key, data = data, attr1 = attr1 ) + with transaction.atomic(): + dbStorage.objects.create(owner = self._owner, key = key, data = data, attr1 = attr1 ) except Exception: - dbStorage.objects.filter(key=key).update(owner = self._owner, data = data, attr1 = attr1) + with transaction.atomic(): + dbStorage.objects.filter(key=key).update(owner = self._owner, data = data, attr1 = attr1) logger.debug('Key saved') def put(self, skey, data): diff --git a/server/src/uds/core/workers/ClusteredProviderManagement.py b/server/src/uds/core/workers/ClusteredProviderManagement.py index 71fee185..11b71633 100644 --- a/server/src/uds/core/workers/ClusteredProviderManagement.py +++ b/server/src/uds/core/workers/ClusteredProviderManagement.py @@ -176,7 +176,7 @@ class ClusterMigrationTask(DelayedTask): logger.error('Can\'t update state of uService object') class ClusterBalancingTask(DelayedTask): - def __init(self, providerId): + def __init__(self, providerId): super(ClusterBalancingTask, self).__init__() self._id = providerId @@ -240,7 +240,7 @@ class ClusterBalancingJob(Job): frecuency = 90 friendly_name = 'Clustered Providers Balancing job' - def __init(self, environment): + def __init__(self, environment): super(ClusterBalancingJob,self).__init__(environment) def run(self): diff --git a/server/src/uds/core/workers/ServiceCacheUpdater.py b/server/src/uds/core/workers/ServiceCacheUpdater.py index 5c5b954c..446f72f5 100644 --- a/server/src/uds/core/workers/ServiceCacheUpdater.py +++ b/server/src/uds/core/workers/ServiceCacheUpdater.py @@ -69,8 +69,8 @@ class ServiceCacheUpdater(Job): def __notifyRestrain(deployedService): log.doLog(deployedService, log.WARN, 'Deployed service is restrained due to errors', log.INTERNAL) logger.info('Deployed service {0} is restrained, will check this later'.format(deployedService.name)) - - @transaction.atomic + + @transaction.atomic def bestDeployedServiceNeedingCacheUpdate(self): # State filter for cached and inAssigned objects # First we get all deployed services that could need cache generation @@ -168,7 +168,6 @@ class ServiceCacheUpdater(Job): # We also return calculated values so we can reuse then return selected, cachedL1, cachedL2, assigned - @transaction.atomic def growL1Cache(self, ds, cacheL1, cacheL2, assigned): ''' This method tries to enlarge L1 cache. @@ -181,14 +180,15 @@ class ServiceCacheUpdater(Job): # First, we try to assign from L2 cache if cacheL2 > 0: valid = None - for n in ds.cachedUserServices().select_for_update().filter(UserServiceManager.getCacheStateFilter(services.UserDeployment.L2_CACHE)).order_by('creation_date'): - if n.needsOsManager(): - if State.isUsable(n.state) is False or State.isUsable(n.os_state): + with transaction.atomic(): + for n in ds.cachedUserServices().select_for_update().filter(UserServiceManager.getCacheStateFilter(services.UserDeployment.L2_CACHE)).order_by('creation_date'): + if n.needsOsManager(): + if State.isUsable(n.state) is False or State.isUsable(n.os_state): + valid = n + break + else: valid = n break - else: - valid = n - break if valid is not None: valid.moveToLevel(services.UserDeployment.L1_CACHE) @@ -201,7 +201,6 @@ class ServiceCacheUpdater(Job): except: logger.exception('Exception') - @transaction.atomic def growL2Cache(self, ds, cacheL1, cacheL2, assigned): ''' Tries to grow L2 cache of service. @@ -217,7 +216,6 @@ class ServiceCacheUpdater(Job): logger.error(str(e)) # TODO: When alerts are ready, notify this - @transaction.atomic def reduceL1Cache(self, ds, cacheL1, cacheL2, assigned): logger.debug("Reducing L1 cache erasing a service in cache for {0}".format(ds)) # We will try to destroy the newest cacheL1 element that is USABLE if the deployer can't cancel a new service creation @@ -244,7 +242,6 @@ class ServiceCacheUpdater(Job): cache = cacheItems[0] cache.removeOrCancel() - @transaction.atomic def reduceL2Cache(self, ds, cacheL1, cacheL2, assigned): logger.debug("Reducing L2 cache erasing a service in cache for {0}".format(ds)) if cacheL2 > 0: diff --git a/server/src/uds/core/workers/UserServiceCleaner.py b/server/src/uds/core/workers/UserServiceCleaner.py index 23553548..a99b8c20 100644 --- a/server/src/uds/core/workers/UserServiceCleaner.py +++ b/server/src/uds/core/workers/UserServiceCleaner.py @@ -56,7 +56,6 @@ class UserServiceInfoItemsCleaner(Job): def __init__(self, environment): super(UserServiceInfoItemsCleaner,self).__init__(environment) - @transaction.atomic def run(self): removeFrom = getSqlDatetime() - timedelta(seconds = GlobalConfig.KEEP_INFO_TIME.getInt(True)) logger.debug('Removing information user services from {0}'.format(removeFrom)) @@ -72,7 +71,6 @@ class UserServiceRemover(Job): def __init__(self, environment): super(UserServiceRemover,self).__init__(environment) - @transaction.atomic def run(self): removeFrom = getSqlDatetime() - timedelta(seconds=10) # We keep at least 30 seconds the machine before removing it, so we avoid connections errors removables = UserService.objects.filter(state=State.REMOVABLE, state_date__lt=removeFrom)[0:UserServiceRemover.removeAtOnce] diff --git a/server/src/uds/services/OVirt/OVirtPublication.py b/server/src/uds/services/OVirt/OVirtPublication.py index 37f8d4de..5f97ee96 100644 --- a/server/src/uds/services/OVirt/OVirtPublication.py +++ b/server/src/uds/services/OVirt/OVirtPublication.py @@ -81,7 +81,7 @@ class OVirtPublication(Publication): ''' Realizes the publication of the service ''' - self._name = self.service().sanitizeVmName('UDS Publication' + ' ' + self.dsName() + "-" + str(self.revision())) + self._name = self.service().sanitizeVmName('UDSP ' + self.dsName() + "-" + str(self.revision())) comments = _('UDS pub for {0} at {1}').format( self.dsName(), str(datetime.now()).split('.')[0] ) self._reason = '' # No error, no reason for it self._destroyAfter = 'f'