forked from shaba/openuds
* More about stats
* Added a fix for "broken" Deployed Services that could prevent broker from updating cache of other deployed services if one gets "inoperable". Now we have added a "Restrain" policy, where if a service has 3 or more deploy errors in less than ten minutes, it will be ignored for cache updating until this policy is not satisfied.
This commit is contained in:
parent
837e9a6b6a
commit
2a95cbd4b9
@ -85,11 +85,11 @@ encoding//src/uds/core/util/Storage.py=utf-8
|
||||
encoding//src/uds/core/util/UniqueIDGenerator.py=utf-8
|
||||
encoding//src/uds/core/util/UniqueMacGenerator.py=utf-8
|
||||
encoding//src/uds/core/util/UniqueNameGenerator.py=utf-8
|
||||
encoding//src/uds/core/util/charts.py=utf-8
|
||||
encoding//src/uds/core/util/connection.py=utf-8
|
||||
encoding//src/uds/core/util/log.py=utf-8
|
||||
encoding//src/uds/core/util/modfinder.py=utf-8
|
||||
encoding//src/uds/core/util/stats/__init__.py=utf-8
|
||||
encoding//src/uds/core/util/stats/charts.py=utf-8
|
||||
encoding//src/uds/core/util/stats/counters.py=utf-8
|
||||
encoding//src/uds/core/workers/AssignedAndUnused.py=utf-8
|
||||
encoding//src/uds/core/workers/CacheCleaner.py=utf-8
|
||||
|
@ -32,30 +32,13 @@
|
||||
|
||||
from uds.models import Provider
|
||||
from uds.models import Service
|
||||
from uds.models import DeployedService
|
||||
from uds.models import StatsCounters
|
||||
from uds.models import StatsEvents
|
||||
|
||||
import logging
|
||||
|
||||
logger = logging.getLogger(__name__)
|
||||
|
||||
|
||||
PROVIDER_OWNER_TYPE, SERVICE_OWNER_TYPE, DEPLOYED_OWNER_TYPE = xrange(3)
|
||||
|
||||
# hierarchy of owners
|
||||
parentsDict = {
|
||||
PROVIDER_OWNER_TYPE: None,
|
||||
SERVICE_OWNER_TYPE: PROVIDER_OWNER_TYPE,
|
||||
DEPLOYED_OWNER_TYPE: SERVICE_OWNER_TYPE,
|
||||
}
|
||||
# Dict to convert objects to owner types
|
||||
# Dict for translations
|
||||
transDict = {
|
||||
DeployedService : DEPLOYED_OWNER_TYPE,
|
||||
Service : SERVICE_OWNER_TYPE,
|
||||
Provider : PROVIDER_OWNER_TYPE
|
||||
}
|
||||
|
||||
|
||||
class StatsManager(object):
|
||||
@ -78,16 +61,8 @@ class StatsManager(object):
|
||||
return StatsManager._manager
|
||||
|
||||
|
||||
def __getOwner(self, fromObject):
|
||||
'''
|
||||
Gets the owner type/id for the specified object, or raises an exception if unknown
|
||||
'''
|
||||
owner_type = transDict[type(fromObject)]
|
||||
owner_id = fromObject.id
|
||||
return (owner_type, owner_id)
|
||||
|
||||
# Counter stats
|
||||
def addCounter(self, toWhat, counterType, counterValue, stamp = None):
|
||||
def addCounter(self, owner_type, owner_id, counterType, counterValue, stamp = None):
|
||||
'''
|
||||
Adds a new counter stats to database.
|
||||
|
||||
@ -103,7 +78,7 @@ class StatsManager(object):
|
||||
|
||||
Nothing
|
||||
'''
|
||||
from uds.models import getSqlDatetime
|
||||
from uds.models import getSqlDatetime, StatsCounters
|
||||
import time
|
||||
|
||||
if stamp is None:
|
||||
@ -113,13 +88,7 @@ class StatsManager(object):
|
||||
stamp = int(time.mktime(stamp.timetuple()))
|
||||
|
||||
try:
|
||||
(owner_type, owner_id) = self.__getOwner(toWhat)
|
||||
except:
|
||||
logger.error('Unhandled stats fo object type {0} and counter type {1}'.format(toWhat, counterType))
|
||||
return
|
||||
|
||||
try:
|
||||
StatsCounters.objects.create(owner_id=owner_id, owner_type=owner_type, counter_type=counterType, value=counterValue, stamp=stamp)
|
||||
StatsCounters.objects.create(owner_type=owner_type, owner_id=owner_id, counter_type=counterType, value=counterValue, stamp=stamp)
|
||||
return True
|
||||
except:
|
||||
logger.error('Exception handling stats saving (maybe database is full?)')
|
||||
@ -143,7 +112,7 @@ class StatsManager(object):
|
||||
'''
|
||||
pass
|
||||
|
||||
def getCounters(self, counterType, **kwargs):
|
||||
def getCounters(self, fromWhat, counterType, **kwargs):
|
||||
'''
|
||||
Retrieves counters from item
|
||||
|
||||
@ -159,6 +128,9 @@ class StatsManager(object):
|
||||
|
||||
Iterator, containing (date, counter) each element
|
||||
'''
|
||||
from uds.models import StatsCounters
|
||||
|
||||
StatsCounters.get_grouped(None, counterType)
|
||||
|
||||
|
||||
def cleanupCounter(self):
|
||||
|
@ -228,12 +228,12 @@ class UserServiceManager(object):
|
||||
|
||||
|
||||
@transaction.commit_on_success
|
||||
def createCacheFor(self, deployedService, cacheLevel):
|
||||
def createCacheFor(self, deployedServicePublication, cacheLevel):
|
||||
'''
|
||||
Creates a new cache for the deployed service publication at level indicated
|
||||
'''
|
||||
logger.debug('Creating a new cache element at level {0} for publication {1}'.format(cacheLevel, deployedService))
|
||||
cache = self.__createCacheAtDb(deployedService, cacheLevel)
|
||||
logger.debug('Creating a new cache element at level {0} for publication {1}'.format(cacheLevel, deployedServicePublication))
|
||||
cache = self.__createCacheAtDb(deployedServicePublication, cacheLevel)
|
||||
ci = cache.getInstance()
|
||||
state = ci.deployForCache(cacheLevel)
|
||||
|
||||
|
@ -220,6 +220,9 @@ class GlobalConfig:
|
||||
# Maximum logs per user service
|
||||
MAX_LOGS_PER_ELEMENT = Config.section(GLOBAL_SECTION).value('maxLogPerElement', '100')
|
||||
|
||||
# Time to restrain a deployed service in case it gives some error at some point
|
||||
RESTRAINT_TIME = Config.section(GLOBAL_SECTION).value('restainTime', '600')
|
||||
|
||||
initDone = False
|
||||
|
||||
@staticmethod
|
||||
@ -250,6 +253,7 @@ class GlobalConfig:
|
||||
GlobalConfig.MAX_INITIALIZING_TIME.get()
|
||||
GlobalConfig.CUSTOM_HTML_LOGIN.get()
|
||||
GlobalConfig.MAX_LOGS_PER_ELEMENT.get()
|
||||
GlobalConfig.RESTRAINT_TIME.get()
|
||||
except:
|
||||
logger.debug('Config table do not exists!!!, maybe we are installing? :-)')
|
||||
|
||||
|
@ -29,4 +29,9 @@
|
||||
'''
|
||||
@author: Adolfo Gómez, dkmaster at dkmon dot com
|
||||
'''
|
||||
import counters
|
||||
import counters
|
||||
|
||||
|
||||
counters._initializeData()
|
||||
|
||||
|
||||
|
@ -33,6 +33,6 @@
|
||||
# Chart types
|
||||
CHART_TYPE_LINE, CHART_TYPE_AREA, CHART_TYPE_BAR = xrange(2)
|
||||
|
||||
def makeChart(fromWhatStat, **kwargs):
|
||||
def make(fromWhatStat, **kwargs):
|
||||
|
||||
pass
|
@ -35,21 +35,127 @@ import logging
|
||||
|
||||
logger = logging.getLogger(__name__)
|
||||
|
||||
|
||||
# Posible counters, note that not all are used by every posible type
|
||||
# FIRST_COUNTER_TYPE, LAST_COUNTER_TYPE are just a placeholder for sanity checks
|
||||
(
|
||||
FIRST_COUNTER_TYPE,
|
||||
CT_LOAD, CT_STORAGE, CT_ASSIGNED,CT_CACHE1, CT_CACHE2, CT_INUSE, CT_ERROR,
|
||||
LAST_COUNTER_TYPE
|
||||
) = xrange(9)
|
||||
CT_LOAD, CT_STORAGE, CT_ASSIGNED, CT_INUSE,
|
||||
) = xrange(4)
|
||||
|
||||
__caRead = None
|
||||
__caWrite = None
|
||||
__transDict = None
|
||||
|
||||
|
||||
def addCounter(toObject, counterType, counterValue, stamp = None):
|
||||
if counterType <= FIRST_COUNTER_TYPE or counterType >= LAST_COUNTER_TYPE:
|
||||
logger.error('Counter type is not valid')
|
||||
def addCounter(obj, counterType, counterValue, stamp = None):
|
||||
'''
|
||||
Adds a counter stat to specified object
|
||||
|
||||
Although any counter type can be added to any object, there is a relation that must be observed
|
||||
or, otherway, the stats will not be recoverable at all:
|
||||
|
||||
Object Type Valid Counters Type
|
||||
---------- -------------------
|
||||
Provider CT_LOAD, CT_STORAGE
|
||||
Service None Right now
|
||||
DeployedService CT_ASSIGNED, CT_CACHE1, CT_CACHE2, CT_INUSE, CT_ERROR
|
||||
|
||||
'''
|
||||
if type(obj) not in __caWrite.get(counterType, ()):
|
||||
logger.error('Type {0} does not accepts counter of type {1}',format(type(obj), counterValue))
|
||||
return False
|
||||
|
||||
return statsManager().addCounter(toObject, counterType, counterValue, stamp)
|
||||
return statsManager().addCounter(__transDict[type(obj)], obj.id, counterType, counterValue, stamp)
|
||||
|
||||
|
||||
def getCounters(obj, counterType, **kwargs):
|
||||
|
||||
fnc = __caRead.get(type)
|
||||
|
||||
|
||||
pass
|
||||
|
||||
|
||||
|
||||
|
||||
|
||||
# Data initialization
|
||||
def _initializeData():
|
||||
'''
|
||||
Initializes dictionaries.
|
||||
|
||||
Hides data from global var space
|
||||
'''
|
||||
from uds.models import Provider, Service, DeployedService
|
||||
|
||||
global __caWrite
|
||||
global __transDict
|
||||
|
||||
__caWrite = {
|
||||
CT_LOAD: (Provider,),
|
||||
CT_STORAGE: (Service,),
|
||||
CT_ASSIGNED: (DeployedService,),
|
||||
CT_INUSE: (DeployedService,),
|
||||
}
|
||||
|
||||
|
||||
# OBtain ids from variups type of object to retrieve stats
|
||||
def get_Id(obj):
|
||||
return obj.id
|
||||
|
||||
def get_P_S_Ids(provider):
|
||||
return (i.id for i in provider.services.all())
|
||||
|
||||
def get_S_DS_Ids(service):
|
||||
return (i.id for i in service.deployedServices.all())
|
||||
|
||||
def get_P_S_DS_Ids(provider):
|
||||
res = ()
|
||||
for i in provider.services.all():
|
||||
res += get_S_DS_Ids(i)
|
||||
return res
|
||||
|
||||
__caRead = {
|
||||
Provider: {
|
||||
CT_LOAD: get_Id,
|
||||
CT_STORAGE: get_P_S_Ids,
|
||||
CT_ASSIGNED: get_P_S_DS_Ids,
|
||||
CT_INUSE: get_P_S_DS_Ids
|
||||
},
|
||||
Service: {
|
||||
CT_STORAGE: get_Id,
|
||||
CT_ASSIGNED: get_S_DS_Ids,
|
||||
CT_INUSE: get_S_DS_Ids
|
||||
},
|
||||
DeployedService: {
|
||||
CT_ASSIGNED: get_Id,
|
||||
CT_INUSE: get_Id
|
||||
}
|
||||
}
|
||||
|
||||
|
||||
def _getIds(obj):
|
||||
to = type(obj)
|
||||
|
||||
if to is DeployedService:
|
||||
return to.id;
|
||||
|
||||
if to is Service:
|
||||
return (i.id for i in obj.userServices.all())
|
||||
|
||||
res = ()
|
||||
if to is Provider:
|
||||
for i in obj.services.all():
|
||||
res += _getIds(i)
|
||||
return res
|
||||
return ()
|
||||
|
||||
OT_PROVIDER, OT_SERVICE, OT_DEPLOYED = xrange(3)
|
||||
|
||||
# Dict to convert objects to owner types
|
||||
# Dict for translations
|
||||
__transDict = {
|
||||
DeployedService : OT_DEPLOYED,
|
||||
Service : OT_SERVICE,
|
||||
Provider : OT_PROVIDER
|
||||
}
|
||||
|
||||
|
@ -52,7 +52,8 @@ class ServiceCacheUpdater(Job):
|
||||
if cache is not needed.
|
||||
This is included as a scheduled task that will run every X seconds, and scheduler will keep it so it will be only executed by one backend at a time
|
||||
'''
|
||||
frecuency = GlobalConfig.CACHE_CHECK_DELAY.getInt() # Request run cache manager every configured seconds. If config value is changed, it will be used at next reload
|
||||
frecuency = GlobalConfig.CACHE_CHECK_DELAY.getInt() # Request run cache manager every configured seconds (defaults to 20 seconds).
|
||||
# If config value is changed, it will be used at next reload
|
||||
|
||||
def __init__(self, environment):
|
||||
super(ServiceCacheUpdater,self).__init__(environment)
|
||||
@ -84,6 +85,10 @@ class ServiceCacheUpdater(Job):
|
||||
logger.debug('Stopped cache generation for deployed service with publication running: {0}'.format(ds))
|
||||
continue
|
||||
|
||||
if ds.isRestrained():
|
||||
logger.debug('Deployed service {0} is restrained, will check this later')
|
||||
continue
|
||||
|
||||
# Get data related to actual state of cache
|
||||
inCacheL1 = ds.cachedUserServices().filter(UserServiceManager.getCacheStateFilter(services.UserDeployment.L1_CACHE)).count()
|
||||
inCacheL2 = ds.cachedUserServices().filter(UserServiceManager.getCacheStateFilter(services.UserDeployment.L2_CACHE)).count()
|
||||
@ -174,6 +179,8 @@ class ServiceCacheUpdater(Job):
|
||||
except MaxServicesReachedException as e:
|
||||
logger.error(str(e))
|
||||
# TODO: When alerts are ready, notify this
|
||||
except:
|
||||
logger.exception()
|
||||
|
||||
@transaction.autocommit
|
||||
def growL2Cache(self, ds, cacheL1, cacheL2, assigned):
|
||||
|
@ -69,7 +69,10 @@ def getSqlDatetime(unix=False):
|
||||
else:
|
||||
date = datetime.now() # If not know how to get database datetime, returns local datetime (this is fine for sqlite, which is local)
|
||||
|
||||
return int(mktime(date.timetuple()))
|
||||
if unix:
|
||||
return int(mktime(date.timetuple()))
|
||||
else:
|
||||
return date
|
||||
|
||||
|
||||
# Services
|
||||
@ -864,7 +867,34 @@ class DeployedService(models.Model):
|
||||
getConnectionInfo without knowing if it is requested by a DeployedService or an UserService
|
||||
'''
|
||||
return [username, password]
|
||||
|
||||
def isRestrained(self):
|
||||
'''
|
||||
Maybe this deployed service is having problems, and that may block some task in some
|
||||
situations.
|
||||
|
||||
To avoid this, we will use a "restrain" policy, where we restrain a deployed service for,
|
||||
for example, create new cache elements is reduced.
|
||||
|
||||
The policy to check is that if a Deployed Service has 3 errors in the last 20 Minutes (by default), it is
|
||||
considered restrained.
|
||||
|
||||
The time that a service is in restrain mode is 20 minutes by default (1200 secs), but it can be modified
|
||||
at globalconfig variables
|
||||
'''
|
||||
from uds.core.util.Config import GlobalConfig
|
||||
|
||||
if GlobalConfig.RESTRAINT_TIME.getInt() <= 0:
|
||||
return False # Do not perform any restraint check if we set the globalconfig to 0 (or less)
|
||||
|
||||
date = getSqlDatetime() - timedelta(seconds=GlobalConfig.RESTRAINT_TIME.getInt())
|
||||
|
||||
if self.userServices.filter(state=State.ERROR, state_date__ge=date).count() >= 3:
|
||||
return True
|
||||
|
||||
return False
|
||||
|
||||
|
||||
def setState(self, state, save = True):
|
||||
'''
|
||||
Updates the state of this object and, optionally, saves it
|
||||
@ -1194,7 +1224,7 @@ class UserService(models.Model):
|
||||
# We need to keep separated two differents os states so service operations (move beween caches, recover service) do not affects os manager state
|
||||
state = models.CharField(max_length=1, default=State.PREPARING, db_index = True) # We set index so filters at cache level executes faster
|
||||
os_state = models.CharField(max_length=1, default=State.PREPARING) # The valid values for this field are PREPARE and USABLE
|
||||
state_date = models.DateTimeField(auto_now_add=True)
|
||||
state_date = models.DateTimeField(auto_now_add=True, db_index = True)
|
||||
creation_date = models.DateTimeField(db_index = True)
|
||||
data = models.TextField(default='')
|
||||
user = models.ForeignKey(User, on_delete=models.CASCADE, related_name = 'userServices', null=True, blank=True, default = None)
|
||||
@ -1380,7 +1410,7 @@ class UserService(models.Model):
|
||||
return [username, password]
|
||||
|
||||
return ds.osmanager.getInstance().processUserPassword(self, username, password)
|
||||
|
||||
|
||||
def setState(self, state):
|
||||
'''
|
||||
Updates the state of this object and, optionally, saves it
|
||||
|
Loading…
x
Reference in New Issue
Block a user