mirror of
synced 2024-12-22 13:34:04 +03:00
* Adapted transactions on db to new django 1.6 model, much more versatile.
Removed all "autocommit, commit_on_success and commit_manually", and replaced with "transaction.atomic" calls. Of course, with code refinement in some parts to adapt to this new situation * Added Changing Some table types from MyISAM to Mysql after migrations are done ALWAYS. This is not too slow, and will ensure that the database supports transactions where they are needed
This commit is contained in:
@ -149,6 +149,7 @@ encoding//src/uds/migrations/0011_auto__add_statscounters__add_statsevents__chg_
@ -129,21 +129,41 @@ class ModelHandlerMixin(object):
del self._params[key]
except KeyError as e:
raise RequestError('needed parameter not found in data {0}'.format(unicode(e)))
item = self.model.objects.create(**args);
except: # Duplicate key probably
raise RequestError('Element already exists (duplicate key error)')
if len(args) == 0: # create new
isNew = False
item = self.model.objects.create(**args);
res = self.item_as_dict(item)
except: # Duplicate key probably
raise RequestError('Element already exists (duplicate key error)')
elif len(args) == 1:
item = self.model.objects.get(pk=self._args[0]);
# Update "general" values
res = self.item_as_dict(item)
raise RequestError('Element {0} do not exists anymore'.format(self._args[0]))
raise RequestError('incorrect invocation to PUT')
isNew = True
if self._params.has_key('data_type'): # Needs to store instance
item.data_type = self._params['data_type']
item.data = item.getInstance(self._params).serialize()
for key, value in item.getInstance().valuesDict().iteritems():
res[key] = value
except Exception as e:
item.delete() # Remove pre-saved element
raise RequestError(unicode(e))
return {'id': item.id }
return res
def delete(self):
logger.debug('method DELETE for {0}, {1}'.format(self.__class__.__name__, self._args))
@ -31,11 +31,6 @@
@author: Adolfo Gómez, dkmaster at dkmon dot com
from django.dispatch import dispatcher
from django.db.models import signals
# Make sure that all services are "available" at service startup
import services # to make sure that the packages are initialized at this point
import auths # To make sure that the packages are initialized at this point
@ -45,22 +40,3 @@ import dispatchers
import models
def modify_MySQL_storage(sender, **kwargs):
from django.db import connection
cursor = connection.cursor()
innoDbTables = ( models.UserService, models.DeployedService, models.DeployedServicePublication,
models.Scheduler, models.DelayedTask, )
dicTables = { k._meta.db_table: True for k in innoDbTables }
for model in kwargs['created_models']:
if dicTables.has_key(db_table):
stmt = 'ALTER TABLE %s ENGINE=%s' % (db_table,'InnoDB')
# sets charset to utf8
stmt = 'ALTER TABLE %s CHARACTER SET \'utf8\' COLLATE \'utf8_general_ci\'' % db_table
signals.post_syncdb.connect(modify_MySQL_storage, sender=models)
@ -34,7 +34,7 @@ from __future__ import unicode_literals
from django.db import transaction
from django.db.models import Q
from uds.models import DelayedTask as dbDelayedTask
from uds.models import DelayedTask as dbDelayedTask, getSqlDatetime
from uds.core.util.Decorators import retryOnException
from ..Environment import Environment
from socket import gethostname
@ -78,29 +78,28 @@ class DelayedTaskRunner(object):
DelayedTaskRunner._runner = DelayedTaskRunner()
return DelayedTaskRunner._runner
def executeOneDelayedTask(self):
now = datetime.now()
now = getSqlDatetime()
filt = Q(execution_time__lt=now) | Q(insert_date__gt=now)
# If next execution is before now or last execution is in the future (clock changed on this server, we take that task as executable)
taskInstance = None
task = dbDelayedTask.objects.select_for_update().filter(filt).order_by('execution_time')[0]
with transaction.atomic(): # Encloses
task = dbDelayedTask.objects.select_for_update().filter(filt).order_by('execution_time')[0]
taskInstance = loads(task.instance.decode(self.CODEC))
except Exception:
# No task waiting, nice
# Transaction have been rolled back using the "with atomic", so here just return
# Note that is taskInstance can't be loaded, this task will not be retried
if taskInstance != None:
env = Environment.getEnvForType(taskInstance.__class__)
def __insert(self, instance, delay, tag):
now = datetime.now()
now = getSqlDatetime()
exec_time = now + timedelta(seconds = delay)
cls = instance.__class__
instanceDump = dumps(instance).encode(self.CODEC)
@ -120,20 +119,20 @@ class DelayedTaskRunner(object):
except Exception, e:
logger.info('Exception inserting a delayed task {0}: {1}'.format(str(e.__class__), e))
time.sleep(1) # Wait a bit before next try...
# If retries == 0, this is a big error
if retries == 0:
logger.error("Could not insert delayed task!!!! {0} {1} {2}".format(instance, delay, tag))
return False
return True
def remove(self, tag):
except Exception as e:
logger.exception('Exception removing a delayed task {0}: {1}'.format(str(e.__class__), e))
def checkExists(self, tag):
if tag == '' or tag is None:
@ -62,13 +62,12 @@ class JobThread(threading.Thread):
done = True
# Erased from database, nothing hapens
# logger.exception(e)
# Databases locked, maybe because we are on a multitask environment, let's try again in a while
logger.info('Database access locked... Retrying')
def __updateDb(self):
job = dbScheduler.objects.select_for_update().get(id=self._dbJobId)
job.state = State.FOR_EXECUTE
@ -96,34 +95,31 @@ class Scheduler(object):
def notifyTermination(self):
self._keepRunning = False
def executeOneJob(self):
Looks for a job and executes it
jobInstance = None
now = getSqlDatetime() # Datetimes are based on database server times
filter = Q(state = State.FOR_EXECUTE) & (Q(owner_server = self._hostname) | Q(owner_server = '')) & (Q(last_execution__gt = now) | Q(next_execution__lt = now))
# If next execution is before now or last execution is in the future (clock changed on this server, we take that task as executable)
# This params are all set inside filter (look at __init__)
job = dbScheduler.objects.select_for_update().filter(filter).order_by('next_execution')[0]
jobInstance = job.getInstance()
if jobInstance == None:
logger.error('Job instance can\'t be resolved for {0}, removing it'.format(job))
logger.debug('Executing job:>{0}<'.format(job.name))
job.state = State.RUNNING
job.owner_server = self._hostname
job.last_execution = now
with transaction.atomic():
now = getSqlDatetime() # Datetimes are based on database server times
filter = Q(state = State.FOR_EXECUTE) & (Q(owner_server = self._hostname) | Q(owner_server = '')) & (Q(last_execution__gt = now) | Q(next_execution__lt = now))
# If next execution is before now or last execution is in the future (clock changed on this server, we take that task as executable)
# This params are all set inside filter (look at __init__)
job = dbScheduler.objects.select_for_update().filter(filter).order_by('next_execution')[0]
jobInstance = job.getInstance()
if jobInstance == None:
logger.error('Job instance can\'t be resolved for {0}, removing it'.format(job))
logger.debug('Executing job:>{0}<'.format(job.name))
job.state = State.RUNNING
job.owner_server = self._hostname
job.last_execution = now
JobThread(jobInstance, job).start() # Do not instatiate thread, just run it
except IndexError:
# Do nothing, there is no jobs for execution
except DatabaseError:
@ -131,10 +127,9 @@ class Scheduler(object):
# This in fact means that we have to retry operation, and retry will happen on main loop
# Look at this http://dev.mysql.com/doc/refman/5.0/en/innodb-deadlocks.html
# I have got some deadlock errors, but looking at that url, i found that it is not so abnormal
logger.debug('Deadlock, no problem at all :-) (sounds hards, but really, no problem)')
transaction.rollback() # So django do not complains about this
logger.debug('Deadlock, no problem at all :-) (sounds hards, but really, no problem, will retry later :-) )')
def releaseOwnShedules(self):
Releases all scheduleds being executed by this scheduler
@ -69,7 +69,7 @@ class PublicationLauncher(DelayedTask):
self._publishId = publish.id
def run(self):
@ -147,7 +147,7 @@ class PublicationFinishChecker(DelayedTask):
DelayedTaskRunner.runner().insert(PublicationFinishChecker(dsp), pi.suggestedTime, PUBTAG + str(dsp.id))
def run(self):
logger.debug('Checking publication finished {0}'.format(self._publishId))
try :
@ -175,7 +175,7 @@ class PublicationManager(object):
return PublicationManager._manager
def publish(self, deployedService):
if deployedService.publications.select_for_update().filter(state__in=State.PUBLISH_STATES).count() > 0:
raise PublishException(_('Already publishing. Wait for previous publication to finish and try again'))
@ -187,7 +187,7 @@ class PublicationManager(object):
logger.debug('Caught exception at publish: {0}'.format(e))
raise PublishException(str(e))
def cancel(self,dsp):
dsp = DeployedServicePublication.objects.select_for_update().get(id=dsp.id)
if dsp.state not in State.PUBLISH_STATES:
@ -207,6 +207,7 @@ class PublicationManager(object):
except Exception, e:
raise PublishException(str(e))
def unpublish(self, dsp):
if State.isUsable(dsp.state) == False and State.isRemovable(dsp.state) == False:
raise PublishException(_('Can\'t unpublish non usable publication'))
@ -57,6 +57,7 @@ class UserServiceOpChecker(DelayedTask):
self._state = service.state
def makeUnique(userService, userServiceInstance, state):
This method makes sure that there will be only one delayedtask related to the userService indicated
@ -65,6 +66,7 @@ class UserServiceOpChecker(DelayedTask):
UserServiceOpChecker.checkAndUpdateState(userService, userServiceInstance, state)
def checkAndUpdateState(userService, userServiceInstance, state):
Checks the value returned from invocation to publish or checkPublishingState, updating the dsp database object
@ -124,6 +126,7 @@ class UserServiceOpChecker(DelayedTask):
def checkLater(userService, ci):
Inserts a task in the delayedTaskRunner so we can check the state of this publication
@ -134,9 +137,8 @@ class UserServiceOpChecker(DelayedTask):
if DelayedTaskRunner.runner().checkExists(USERSERVICE_TAG + str(userService.id)):
DelayedTaskRunner.runner().insert(UserServiceOpChecker(userService), ci.suggestedTime, USERSERVICE_TAG + str(userService.id))
def run(self):
logger.debug('Checking user service finished {0}'.format(self._svrId))
uService = None
@ -145,9 +147,7 @@ class UserServiceOpChecker(DelayedTask):
if uService.state != self._state:
logger.debug('Task overrided by another task (state of item changed)')
# This item is no longer valid, returning will not check it again (no checkLater called)
ci = uService.getInstance()
logger.debug("uService instance class: {0}".format(ci.__class__))
state = ci.checkState()
@ -164,7 +164,6 @@ class UserServiceOpChecker(DelayedTask):
except Exception:
logger.error('Can\'t update state of uService object')
class UserServiceManager(object):
@ -188,7 +187,7 @@ class UserServiceManager(object):
return Q(state__in=[State.PREPARING, State.USABLE])
def __checkMaxDeployedReached(self, deployedService):
Checks if maxDeployed for the service has been reached, and, if so,
@ -207,6 +206,7 @@ class UserServiceManager(object):
def __createCacheAtDb(self, deployedServicePublication, cacheLevel):
Private method to instatiate a cache element at database with default states
@ -218,6 +218,7 @@ class UserServiceManager(object):
state_date=now, creation_date=now, data = '', deployed_service = deployedServicePublication.deployed_service,
user = None, in_use = False )
def __createAssignedAtDb(self, deployedServicePublication, user):
Private method to instatiate an assigned element at database with default state
@ -227,6 +228,7 @@ class UserServiceManager(object):
return deployedServicePublication.userServices.create(cache_level=0, state=State.PREPARING, os_state=State.PREPARING,
state_date=now, creation_date=now, data='', deployed_service=deployedServicePublication.deployed_service, user=user, in_use=False)
def __createAssignedAtDbForNoPublication(self, deployedService, user):
__createCacheAtDb and __createAssignedAtDb uses a publication for create the UserService.
@ -239,7 +241,7 @@ class UserServiceManager(object):
state_date=now, creation_date=now, data='', publication=None, user=user, in_use=False)
def createCacheFor(self, deployedServicePublication, cacheLevel):
Creates a new cache for the deployed service publication at level indicated
@ -252,7 +254,7 @@ class UserServiceManager(object):
UserServiceOpChecker.checkAndUpdateState(cache, ci, state)
return cache
def createAssignedFor(self, ds, user):
Creates a new assigned deployed service for the publication and user indicated
@ -272,7 +274,7 @@ class UserServiceManager(object):
return assigned
def createAssignable(self, ds, deployed, user):
Creates an assignable service
@ -290,7 +292,7 @@ class UserServiceManager(object):
def moveToLevel(self, cache, cacheLevel):
Moves a cache element from one level to another
@ -308,7 +310,7 @@ class UserServiceManager(object):
UserServiceOpChecker.makeUnique(cache, ci, state)
def cancel(self, uService):
Cancels a user service creation
@ -329,7 +331,7 @@ class UserServiceManager(object):
return uService
def remove(self, uService):
Removes a uService element
@ -353,12 +355,12 @@ class UserServiceManager(object):
raise OperationException(_('Can\'t remove nor cancel {0} cause its states doesn\'t allows it'))
def removeInfoItems(self, dsp):
def getAssignationForUser(self, ds, user):
# First, we try to locate an already assigned service
existing = ds.assignedUserServices().filter(user=user,state__in=State.VALID_STATES)
@ -427,7 +429,7 @@ class UserServiceManager(object):
return False
return True
def isReady(self, uService):
uService = UserService.objects.select_for_update().get(id=uService.id)
@ -32,6 +32,8 @@
from __future__ import unicode_literals
from django.db import transaction
from uds.core.util.Config import GlobalConfig
from uds.models import DeployedService, getSqlDatetime
from uds.core.util.State import State
@ -52,15 +54,16 @@ class AssignedAndUnused(Job):
since_state = getSqlDatetime() - timedelta( seconds = GlobalConfig.CHECK_UNUSED_TIME.getInt() )
for ds in DeployedService.objects.all():
# If do not needs os manager, this is
if ds.osmanager is not None:
osm = ds.osmanager.getInstance()
if osm.processUnusedMachines is True:
logger.debug('Processing unused services for {0}'.format(osm))
with transaction.atomic():
if ds.osmanager is not None:
osm = ds.osmanager.getInstance()
if osm.processUnusedMachines is True:
logger.debug('Processing unused services for {0}'.format(osm))
for us in ds.assignedUserServices().select_for_update().filter(in_use=False,state_date__lt=since_state, state=State.USABLE, os_state=State.USABLE):
logger.debug('Found unused assigned service {0}'.format(us))
else: # No os manager, simply remove unused services in specified time
for us in ds.assignedUserServices().select_for_update().filter(in_use=False,state_date__lt=since_state, state=State.USABLE, os_state=State.USABLE):
logger.debug('Found unused assigned service {0}'.format(us))
else: # No os manager, simply remove unused services in specified time
for us in ds.assignedUserServices().select_for_update().filter(in_use=False,state_date__lt=since_state, state=State.USABLE, os_state=State.USABLE):
logger.debug('Found unused assigned service {0}'.format(us))
@ -105,6 +105,7 @@ class ClusterMigrationTask(DelayedTask):
self._state = service.state
def checkAndUpdateState(userService, userServiceInstance, state):
Checks the value returned from invocation to publish or checkPublishingState, updating the dsp database object
@ -146,7 +147,7 @@ class ClusterMigrationTask(DelayedTask):
DelayedTaskRunner.runner().insert(ClusterUpdateStats(userService), userServiceInstance.suggestedTime, ClusterUpdateStats + str(userService.id))
def run(self):
logger.debug('Checking user service finished migrating {0}'.format(self._serviceId))
uService = None
@ -155,7 +156,6 @@ class ClusterMigrationTask(DelayedTask):
if uService.state != self._state:
logger.debug('Task overrided by another task (state of item changed)')
# This item is no longer valid, returning will not check it again (no checkLater called)
ci = uService.getInstance()
@ -174,7 +174,6 @@ class ClusterMigrationTask(DelayedTask):
except Exception:
logger.error('Can\'t update state of uService object')
class ClusterBalancingTask(DelayedTask):
def __init(self, providerId):
@ -182,7 +181,7 @@ class ClusterBalancingTask(DelayedTask):
self._id = providerId
def migrate(serviceId, toNode):
service = UserService.objects.select_for_update().get(pk=serviceId)
@ -203,7 +202,6 @@ class ClusterBalancingTask(DelayedTask):
logger.exception('Setting error state at migration init')
def run(self):
@ -61,7 +61,7 @@ class DeployedServiceRemover(Job):
def __init__(self, environment):
def startRemovalOf(self, ds):
# Get publications in course...., can be at most 1!!!
logger.debug('Removal process of {0}'.format(ds))
@ -80,7 +80,7 @@ class DeployedServiceRemover(Job):
def continueRemovalOf(self, ds):
# First, we remove all publications and user services in "info_state"
@ -70,7 +70,7 @@ class ServiceCacheUpdater(Job):
log.doLog(deployedService, log.WARN, 'Deployed service is restrained due to errors', log.INTERNAL)
logger.info('Deployed service {0} is restrained, will check this later'.format(deployedService.name))
def bestDeployedServiceNeedingCacheUpdate(self):
# State filter for cached and inAssigned objects
# First we get all deployed services that could need cache generation
@ -168,7 +168,7 @@ class ServiceCacheUpdater(Job):
# We also return calculated values so we can reuse then
return selected, cachedL1, cachedL2, assigned
def growL1Cache(self, ds, cacheL1, cacheL2, assigned):
This method tries to enlarge L1 cache.
@ -201,7 +201,7 @@ class ServiceCacheUpdater(Job):
def growL2Cache(self, ds, cacheL1, cacheL2, assigned):
Tries to grow L2 cache of service.
@ -217,6 +217,7 @@ class ServiceCacheUpdater(Job):
# TODO: When alerts are ready, notify this
def reduceL1Cache(self, ds, cacheL1, cacheL2, assigned):
logger.debug("Reducing L1 cache erasing a service in cache for {0}".format(ds))
# We will try to destroy the newest cacheL1 element that is USABLE if the deployer can't cancel a new service creation
@ -243,6 +244,7 @@ class ServiceCacheUpdater(Job):
cache = cacheItems[0]
def reduceL2Cache(self, ds, cacheL1, cacheL2, assigned):
logger.debug("Reducing L2 cache erasing a service in cache for {0}".format(ds))
if cacheL2 > 0:
@ -56,7 +56,7 @@ class UserServiceInfoItemsCleaner(Job):
def __init__(self, environment):
def run(self):
removeFrom = getSqlDatetime() - timedelta(seconds = GlobalConfig.KEEP_INFO_TIME.getInt(True))
logger.debug('Removing information user services from {0}'.format(removeFrom))
@ -72,7 +72,7 @@ class UserServiceRemover(Job):
def __init__(self, environment):
def run(self):
removeFrom = getSqlDatetime() - timedelta(seconds=10) # We keep at least 30 seconds the machine before removing it, so we avoid connections errors
removables = UserService.objects.filter(state=State.REMOVABLE, state_date__lt=removeFrom)[0:UserServiceRemover.removeAtOnce]
@ -0,0 +1,26 @@
from __future__ import unicode_literals
from south import signals
from uds import models
import logging
logger = logging.getLogger(__name__)
# Ensure tables that needs to be in InnoDB are so
def modify_MySQL_storage(sender, **kwargs):
from django.db import connection
cursor = connection.cursor()
logger.info('Converting table')
innoDbTables = ( models.UserService, models.DeployedService, models.DeployedServicePublication,
models.Scheduler, models.DelayedTask, )
for model in innoDbTables:
stmt = 'ALTER TABLE %s ENGINE=%s' % (db_table,'InnoDB')
# sets charset to utf8
stmt = 'ALTER TABLE %s CHARACTER SET \'utf8\' COLLATE \'utf8_general_ci\'' % db_table
@ -164,9 +164,8 @@ gui.connectivity.link = function(event) {
gui.connectivity.transports.rest.create(fields, function(data) { // Success on put
}, function(jqXHR, textStatus, errorThrown) { // fail on put
gui.launchModal(gettext('Error creating transport'), jqXHR.responseText, ' ');
}, gui.failRequestModalFnc(gettext('Error creating transport')) // Fail on put, show modal message
@ -1,4 +1,5 @@
/* jshint strict: true */
// Operations commmon to most elements
function BasicGuiElement(name) {
"use strict";
this.name = name;
@ -79,6 +80,10 @@ GuiElement.prototype = {
var tableId = this.name + '-table';
var $this = this; // Store this for child functions
// ---------------
// Cells renderers
// ---------------
// Empty cells transform
var renderEmptyCell = function(data) {
if( data === '' )
@ -119,7 +124,8 @@ GuiElement.prototype = {
return dict[data] || renderEmptyCell('');
this.rest.tableInfo(function(data) {
this.rest.tableInfo(function(data) { // Gets tableinfo data (columns, title, visibility of fields, etc...
var title = data.title;
var columns = [];
$.each(data.fields, function(index, value) {
@ -179,7 +185,7 @@ GuiElement.prototype = {
columns: columns,
$this.rest.overview(function(data) {
$this.rest.overview(function(data) { // Gets "overview" data for table (table contents, but resume form)
var table = gui.table(title, tableId);
if (options.container === undefined) {
gui.appendToWorkspace('<div class="row"><div class="col-lg-12">' + table.text + '</div></div>');
@ -199,21 +205,21 @@ GuiElement.prototype = {
if( data.length > 1000 )
$this.rest.overview(function(data) {
$this.rest.overview(function(data) { // Restore overview
setTimeout( function() {
}, 0);
}); // End restore overview
return false; // This may be used on button or href, better disable execution of it
var btns = [];
if (options.buttons) {
// Generic click handler generator for this table
var clickHandlerFor = function(handler, action, newHandler) {
var handleFnc = handler || function(val, action, tbl) {gui.doLog('Default handler called for ', action);};
return function(btn) {
@ -247,7 +253,7 @@ GuiElement.prototype = {
$.each(options.buttons, function(index, value) {
$.each(options.buttons, function(index, value) { // Iterate through button definition
var btn;
switch (value) {
case 'new':
@ -313,10 +319,10 @@ GuiElement.prototype = {
case 'xls':
btn = {
btn = {
"sExtends" : "text",
"sButtonText" : gui.config.dataTableButtons.xls.text,
"fnClick" : function(){
"fnClick" : function() { // Export to excel
api.templates.get('spreadsheet', function(tmpl) {
var styles = { 'bold': 's21', };
var uri = 'data:application/vnd.ms-excel;base64,',
@ -354,7 +360,7 @@ GuiElement.prototype = {
{type: 'application/vnd.ms-excel'} ), title + '.xls');
}, 20);
}, // End export to excell
"sButtonClass" : gui.config.dataTableButtons.xls.css,
@ -362,7 +368,7 @@ GuiElement.prototype = {
if(btn) {
}); // End buttoon iteration
// Initializes oTableTools
@ -420,8 +426,8 @@ GuiElement.prototype = {
if( options.onLoad ) {
}); // End Overview data
}); // End Tableinfo data
return '#' + tableId;
@ -112,7 +112,7 @@
gui.appendToWorkspace(gui.modal(id, title, content, actionButton, closeButton));
id = '#' + id; // for jQuery
$(id).modal({keyboard: false})
.on('hidden.bs.modal', function () {
@ -165,7 +165,7 @@
// Launch modal
$(id).modal({keyboard: false})
.on('hidden.bs.modal', function () {
@ -181,6 +181,12 @@
gui.failRequestModalFnc = function(title) {
return function(jqXHR, textStatus, errorThrown) { // fail on put
gui.launchModal(title, jqXHR.responseText, ' ');
gui.clearWorkspace = function() {
Reference in New Issue
Block a user