Reformated & minor updates for workers

This commit is contained in:
Adolfo Gómez García 2020-11-27 11:12:06 +01:00
parent f2d55d6141
commit 501565c088
13 changed files with 428 additions and 171 deletions

View File

@ -1,7 +1,7 @@
# -*- coding: utf-8 -*-
#
# Copyright (c) 2012-2019 Virtual Cable S.L.
# Copyright (c) 2012-2020 Virtual Cable S.L.U.
# All rights reserved.
#
# Redistribution and use in source and binary forms, with or without modification,
@ -39,7 +39,7 @@ import logging
logger = logging.getLogger(__name__)
def initialize():
def initialize() -> None:
"""
This imports all packages that are descendant of this package, and, after that,
it register all subclases of service provider as
@ -56,9 +56,7 @@ def initialize():
importlib.invalidate_caches()
p = jobs.Job
# This is marked as error in IDE, but it's not (__subclasses__)
for cls in p.__subclasses__():
for cls in jobs.Job.__subclasses__():
logger.debug('Examining worker %s', cls.__module__)
# Limit to autoregister just workers jobs inside this module
if cls.__module__[0:16] == 'uds.core.workers':

View File

@ -1,7 +1,7 @@
# -*- coding: utf-8 -*-
#
# Copyright (c) 2012-2019 Virtual Cable S.L.
# Copyright (c) 2012-2020 Virtual Cable S.L.U.
# All rights reserved.
#
# Redistribution and use in source and binary forms, with or without modification,
@ -48,18 +48,20 @@ class AssignedAndUnused(Job):
friendly_name = 'Unused services checker'
def run(self) -> None:
since_state = getSqlDatetime() - timedelta(seconds=GlobalConfig.CHECK_UNUSED_TIME.getInt())
since_state = getSqlDatetime() - timedelta(
seconds=GlobalConfig.CHECK_UNUSED_TIME.getInt()
)
# Locate service pools with pending assigned service in use
outdatedServicePools = ServicePool.objects.annotate(
outdated=Count(
'userServices',
'userServices',
filter=Q(
userServices__in_use=False,
userServices__state_date__lt=since_state,
userServices__state=State.USABLE,
userServices__os_state=State.USABLE,
userServices__cache_level=0
)
userServices__cache_level=0,
),
)
).filter(outdated__gt=0, state=State.ACTIVE)
for ds in outdatedServicePools:
@ -70,11 +72,25 @@ class AssignedAndUnused(Job):
if ds.osmanager:
osm = ds.osmanager.getInstance()
if osm.processUnusedMachines is True:
logger.debug('Processing unused services for %s, %s', ds, ds.osmanager)
for us in ds.assignedUserServices().filter(in_use=False, state_date__lt=since_state, state=State.USABLE, os_state=State.USABLE):
logger.debug(
'Processing unused services for %s, %s', ds, ds.osmanager
)
for us in ds.assignedUserServices().filter(
in_use=False,
state_date__lt=since_state,
state=State.USABLE,
os_state=State.USABLE,
):
logger.debug('Found unused assigned service %s', us)
osm.processUnused(us)
else: # No os manager, simply remove unused services in specified time
for us in ds.assignedUserServices().filter(in_use=False, state_date__lt=since_state, state=State.USABLE, os_state=State.USABLE):
logger.debug('Found unused assigned service with no OS Manager %s', us)
for us in ds.assignedUserServices().filter(
in_use=False,
state_date__lt=since_state,
state=State.USABLE,
os_state=State.USABLE,
):
logger.debug(
'Found unused assigned service with no OS Manager %s', us
)
us.remove()

View File

@ -1,7 +1,6 @@
# -*- coding: utf-8 -*-
#
# Copyright (c) 2012-2019 Virtual Cable S.L.
# Copyright (c) 2012-2020 Virtual Cable S.L.U.
# All rights reserved.
#
# Redistribution and use in source and binary forms, with or without modification,
@ -49,21 +48,34 @@ class HangedCleaner(Job):
friendly_name = 'Hanged services checker'
def run(self):
since_state = getSqlDatetime() - timedelta(seconds=GlobalConfig.MAX_INITIALIZING_TIME.getInt())
since_state = getSqlDatetime() - timedelta(
seconds=GlobalConfig.MAX_INITIALIZING_TIME.getInt()
)
# Filter for locating machine not ready
flt = (
Q(state_date__lt=since_state, state=State.PREPARING) |
Q(state_date__lt=since_state, state=State.USABLE, os_state=State.PREPARING)
flt = Q(state_date__lt=since_state, state=State.PREPARING) | Q(
state_date__lt=since_state, state=State.USABLE, os_state=State.PREPARING
)
withHangedServices = ServicePool.objects.annotate(
hanged = Count(
'userServices',
# Rewrited Filter for servicePool
filter=Q(userServices__state_date__lt=since_state, userServices__state=State.PREPARING) |
Q(userServices__state_date__lt=since_state, userServices__state=State.USABLE, userServices__os_state=State.PREPARING)
withHangedServices = (
ServicePool.objects.annotate(
hanged=Count(
'userServices',
# Rewrited Filter for servicePool
filter=Q(
userServices__state_date__lt=since_state,
userServices__state=State.PREPARING,
)
| Q(
userServices__state_date__lt=since_state,
userServices__state=State.USABLE,
userServices__os_state=State.PREPARING,
),
)
)
).exclude(hanged=0).exclude(service__provider__maintenance_mode=True).filter(state=State.ACTIVE)
.exclude(hanged=0)
.exclude(service__provider__maintenance_mode=True)
.filter(state=State.ACTIVE)
)
# Type
servicePool: ServicePool
@ -72,9 +84,22 @@ class HangedCleaner(Job):
logger.debug('Searching for hanged services for %s', servicePool)
us: UserService
for us in servicePool.userServices.filter(flt):
if us.getProperty('destroy_after'): # It's waiting for removal, skip this very specific case
if us.getProperty(
'destroy_after'
): # It's waiting for removal, skip this very specific case
continue
logger.debug('Found hanged service %s', us)
log.doLog(us, log.ERROR, 'User Service seems to be hanged. Removing it.', log.INTERNAL)
log.doLog(servicePool, log.ERROR, 'Removing user service {} because it seems to be hanged'.format(us.friendly_name))
log.doLog(
us,
log.ERROR,
'User Service seems to be hanged. Removing it.',
log.INTERNAL,
)
log.doLog(
servicePool,
log.ERROR,
'Removing user service {} because it seems to be hanged'.format(
us.friendly_name
),
)
us.removeOrCancel()

View File

@ -1,7 +1,6 @@
# -*- coding: utf-8 -*-
#
# Copyright (c) 2012-2019 Virtual Cable S.L.
# Copyright (c) 2012-2020 Virtual Cable S.L.U.
# All rights reserved.
#
# Redistribution and use in source and binary forms, with or without modification,
@ -46,21 +45,34 @@ logger = logging.getLogger(__name__)
class PublicationInfoItemsCleaner(Job):
frecuency = 3607
frecuency_cfg = GlobalConfig.CLEANUP_CHECK # Request run cache "info" cleaner every configured seconds. If config value is changed, it will be used at next reload
frecuency_cfg = (
GlobalConfig.CLEANUP_CHECK
) # Request run cache "info" cleaner every configured seconds. If config value is changed, it will be used at next reload
friendly_name = 'Publications Info Cleaner'
def run(self):
removeFrom = getSqlDatetime() - timedelta(seconds=GlobalConfig.KEEP_INFO_TIME.getInt(True))
ServicePoolPublication.objects.filter(state__in=State.INFO_STATES, state_date__lt=removeFrom).delete()
removeFrom = getSqlDatetime() - timedelta(
seconds=GlobalConfig.KEEP_INFO_TIME.getInt(True)
)
ServicePoolPublication.objects.filter(
state__in=State.INFO_STATES, state_date__lt=removeFrom
).delete()
class PublicationCleaner(Job):
frecuency = 31
frecuency_cfg = GlobalConfig.REMOVAL_CHECK # Request run publication "removal" every configued seconds. If config value is changed, it will be used at next reload
frecuency_cfg = (
GlobalConfig.REMOVAL_CHECK
) # Request run publication "removal" every configued seconds. If config value is changed, it will be used at next reload
friendly_name = 'Publication Cleaner'
def run(self):
removables: typing.Iterable[ServicePoolPublication] = ServicePoolPublication.objects.filter(state=State.REMOVABLE, deployed_service__service__provider__maintenance_mode=False)
removables: typing.Iterable[
ServicePoolPublication
] = ServicePoolPublication.objects.filter(
state=State.REMOVABLE,
deployed_service__service__provider__maintenance_mode=False,
)
for removable in removables:
try:
publicationManager().unpublish(removable)

View File

@ -1,7 +1,6 @@
# -*- coding: utf-8 -*-
#
# Copyright (c) 2012-2019 Virtual Cable S.L.
# Copyright (c) 2012-2020 Virtual Cable S.L.U.
# All rights reserved.
#
# Redistribution and use in source and binary forms, with or without modification,
@ -47,12 +46,20 @@ class ScheduledAction(Job):
def run(self):
configuredAction: CalendarAction
for configuredAction in CalendarAction.objects.filter(
service_pool__service__provider__maintenance_mode=False, # Avoid maintenance
service_pool__state=states.servicePool.ACTIVE, # Avoid Non active pools
next_execution__lt=getSqlDatetime()
).order_by('next_execution'):
logger.info('Executing calendar action %s.%s (%s)', configuredAction.service_pool.name, configuredAction.calendar.name, configuredAction.action)
service_pool__service__provider__maintenance_mode=False, # Avoid maintenance
service_pool__state=states.servicePool.ACTIVE, # Avoid Non active pools
next_execution__lt=getSqlDatetime(),
).order_by('next_execution'):
logger.info(
'Executing calendar action %s.%s (%s)',
configuredAction.service_pool.name,
configuredAction.calendar.name,
configuredAction.action,
)
try:
configuredAction.execute()
except Exception:
logger.exception('Got an exception executing calendar access action: %s', configuredAction)
logger.exception(
'Got an exception executing calendar access action: %s',
configuredAction,
)

View File

@ -1,7 +1,6 @@
# -*- coding: utf-8 -*-
#
# Copyright (c) 2012-2019 Virtual Cable S.L.
# Copyright (c) 2012-2020 Virtual Cable S.L.U.
# All rights reserved.
#
# Redistribution and use in source and binary forms, with or without modification,
@ -41,17 +40,23 @@ from uds.core.jobs import Job
logger = logging.getLogger(__name__)
MAX_EXECUTION_MINUTES = 15 # Minutes
class SchedulerHousekeeping(Job):
"""
Ensures no task is executed for more than 15 minutes
"""
frecuency = 301 # Frecuncy for this job
friendly_name = 'Scheduler house keeping'
def run(self):
"""
Look for "hanged" scheduler tasks and reset them
Look for "hanged" scheduler tasks and reschedule them
"""
since = getSqlDatetime() - timedelta(minutes=15)
since = getSqlDatetime() - timedelta(minutes=MAX_EXECUTION_MINUTES)
with transaction.atomic():
Scheduler.objects.select_for_update().filter(last_execution__lt=since, state=State.RUNNING).update(owner_server='', state=State.FOR_EXECUTE)
Scheduler.objects.select_for_update().filter(
last_execution__lt=since, state=State.RUNNING
).update(owner_server='', state=State.FOR_EXECUTE)

View File

@ -1,7 +1,6 @@
# -*- coding: utf-8 -*-
#
# Copyright (c) 2012-2019 Virtual Cable S.L.
# Copyright (c) 2012-2020 Virtual Cable S.L.U.
# All rights reserved.
#
# Redistribution and use in source and binary forms, with or without modification,
@ -45,21 +44,31 @@ logger = logging.getLogger(__name__)
class DeployedServiceInfoItemsCleaner(Job):
frecuency = 3607
frecuency_cfg = GlobalConfig.CLEANUP_CHECK # Request run cache "info" cleaner every configured seconds. If config value is changed, it will be used at next reload
frecuency_cfg = (
GlobalConfig.CLEANUP_CHECK
) # Request run cache "info" cleaner every configured seconds. If config value is changed, it will be used at next reload
friendly_name = 'Deployed Service Info Cleaner'
def run(self):
removeFrom = getSqlDatetime() - timedelta(seconds=GlobalConfig.KEEP_INFO_TIME.getInt())
ServicePool.objects.filter(state__in=State.INFO_STATES, state_date__lt=removeFrom).delete()
removeFrom = getSqlDatetime() - timedelta(
seconds=GlobalConfig.KEEP_INFO_TIME.getInt()
)
ServicePool.objects.filter(
state__in=State.INFO_STATES, state_date__lt=removeFrom
).delete()
class DeployedServiceRemover(Job):
frecuency = 31
frecuency_cfg = GlobalConfig.REMOVAL_CHECK # Request run publication "removal" every configued seconds. If config value is changed, it will be used at next reload
frecuency_cfg = (
GlobalConfig.REMOVAL_CHECK
) # Request run publication "removal" every configued seconds. If config value is changed, it will be used at next reload
friendly_name = 'Deployed Service Cleaner'
def startRemovalOf(self, servicePool: ServicePool):
if servicePool.service is None: # Maybe an inconsistent value? (must not, but if no ref integrity in db, maybe someone "touched.. ;)")
if (
servicePool.service is None
): # Maybe an inconsistent value? (must not, but if no ref integrity in db, maybe someone "touched.. ;)")
logger.error('Found service pool %s without service', servicePool.name)
servicePool.delete() # Just remove it "a las bravas", the best we can do
return
@ -71,7 +80,9 @@ class DeployedServiceRemover(Job):
for pub in publishing:
pub.cancel()
# Now all publishments are canceling, let's try to cancel cache and assigned
uServices: typing.Iterable[UserService] = servicePool.userServices.filter(state=State.PREPARING)
uServices: typing.Iterable[UserService] = servicePool.userServices.filter(
state=State.PREPARING
)
for userService in uServices:
logger.debug('Canceling %s', userService)
userService.cancel()
@ -90,7 +101,9 @@ class DeployedServiceRemover(Job):
try:
# Now all publications are canceling, let's try to cancel cache and assigned also
uServices: typing.Iterable[UserService] = servicePool.userServices.filter(state=State.PREPARING)
uServices: typing.Iterable[UserService] = servicePool.userServices.filter(
state=State.PREPARING
)
for userService in uServices:
logger.debug('Canceling %s', userService)
userService.cancel()
@ -99,13 +112,17 @@ class DeployedServiceRemover(Job):
# First, we remove all publications and user services in "info_state"
with transaction.atomic():
servicePool.userServices.select_for_update().filter(state__in=State.INFO_STATES).delete()
servicePool.userServices.select_for_update().filter(
state__in=State.INFO_STATES
).delete()
# Mark usable user services as removable
now = getSqlDatetime()
with transaction.atomic():
servicePool.userServices.select_for_update().filter(state=State.USABLE).update(state=State.REMOVABLE, state_date=now)
servicePool.userServices.select_for_update().filter(
state=State.USABLE
).update(state=State.REMOVABLE, state_date=now)
# When no service is at database, we start with publications
if servicePool.userServices.all().count() == 0:
@ -115,8 +132,12 @@ class DeployedServiceRemover(Job):
logger.debug('Active publication found, unpublishing it')
servicePool.unpublish()
else:
logger.debug('No active publication found, removing info states and checking if removal is done')
servicePool.publications.filter(state__in=State.INFO_STATES).delete()
logger.debug(
'No active publication found, removing info states and checking if removal is done'
)
servicePool.publications.filter(
state__in=State.INFO_STATES
).delete()
if servicePool.publications.count() == 0:
servicePool.removed() # Mark it as removed, clean later from database
except Exception:
@ -124,7 +145,9 @@ class DeployedServiceRemover(Job):
def run(self):
# First check if there is someone in "removable" estate
removableServicePools: typing.Iterable[ServicePool] = ServicePool.objects.filter(state=State.REMOVABLE)[:10]
removableServicePools: typing.Iterable[
ServicePool
] = ServicePool.objects.filter(state=State.REMOVABLE)[:10]
for servicePool in removableServicePools:
try:
# Skips checking deployed services in maintenance mode
@ -137,7 +160,9 @@ class DeployedServiceRemover(Job):
except Exception as e2:
logger.error('Could not delete %s', e2)
removingServicePools: typing.Iterable[ServicePool] = ServicePool.objects.filter(state=State.REMOVING)[:10]
removingServicePools: typing.Iterable[ServicePool] = ServicePool.objects.filter(
state=State.REMOVING
)[:10]
for servicePool in removingServicePools:
try:
# Skips checking deployed services in maintenance mode

View File

@ -1,7 +1,6 @@
# -*- coding: utf-8 -*-
#
# Copyright (c) 2012-2019 Virtual Cable S.L.
# Copyright (c) 2012-2020 Virtual Cable S.L.U.
# All rights reserved.
#
# Redistribution and use in source and binary forms, with or without modification,
@ -54,8 +53,11 @@ class ServiceCacheUpdater(Job):
if cache is not needed.
This is included as a scheduled task that will run every X seconds, and scheduler will keep it so it will be only executed by one backend at a time
"""
frecuency = 19
frecuency_cfg = GlobalConfig.CACHE_CHECK_DELAY # Request run cache manager every configured seconds (defaults to 20 seconds).
frecuency_cfg = (
GlobalConfig.CACHE_CHECK_DELAY
) # Request run cache manager every configured seconds (defaults to 20 seconds).
friendly_name = 'Service Cache Updater'
@staticmethod
@ -64,18 +66,29 @@ class ServiceCacheUpdater(Job):
@staticmethod
def __notifyRestrain(servicePool) -> None:
log.doLog(servicePool, log.WARN, 'Service Pool is restrained due to excesive errors', log.INTERNAL)
log.doLog(
servicePool,
log.WARN,
'Service Pool is restrained due to excesive errors',
log.INTERNAL,
)
logger.info('%s is restrained, will check this later', servicePool.name)
def servicesPoolsNeedingCacheUpdate(self) -> typing.List[typing.Tuple[ServicePool, int, int, int]]:
def servicesPoolsNeedingCacheUpdate(
self,
) -> typing.List[typing.Tuple[ServicePool, int, int, int]]:
# State filter for cached and inAssigned objects
# First we get all deployed services that could need cache generation
# We start filtering out the deployed services that do not need caching at all.
servicePoolsNeedingCaching: typing.Iterable[ServicePool] = ServicePool.objects.filter(
Q(initial_srvs__gte=0) | Q(cache_l1_srvs__gte=0)
).filter(
max_srvs__gt=0, state=State.ACTIVE, service__provider__maintenance_mode=False
).iterator()
servicePoolsNeedingCaching: typing.Iterable[ServicePool] = (
ServicePool.objects.filter(Q(initial_srvs__gte=0) | Q(cache_l1_srvs__gte=0))
.filter(
max_srvs__gt=0,
state=State.ACTIVE,
service__provider__maintenance_mode=False,
)
.iterator()
)
# We will get the one that proportionally needs more cache
servicesPools: typing.List[typing.Tuple[ServicePool, int, int, int]] = []
@ -84,29 +97,64 @@ class ServiceCacheUpdater(Job):
# If this deployedService don't have a publication active and needs it, ignore it
spServiceInstance = servicePool.service.getInstance()
if servicePool.activePublication() is None and spServiceInstance.publicationType is not None:
logger.debug('Skipping. %s Needs publication but do not have one', servicePool.name)
if (
servicePool.activePublication() is None
and spServiceInstance.publicationType is not None
):
logger.debug(
'Skipping. %s Needs publication but do not have one',
servicePool.name,
)
continue
# If it has any running publication, do not generate cache anymore
if servicePool.publications.filter(state=State.PREPARING).count() > 0:
logger.debug('Skipping cache generation for service pool with publication running: %s', servicePool.name)
logger.debug(
'Skipping cache generation for service pool with publication running: %s',
servicePool.name,
)
continue
if servicePool.isRestrained():
logger.debug('StopSkippingped cache generation for restrained service pool: %s', servicePool.name)
logger.debug(
'StopSkippingped cache generation for restrained service pool: %s',
servicePool.name,
)
ServiceCacheUpdater.__notifyRestrain(servicePool)
continue
# Get data related to actual state of cache
inCacheL1: int = servicePool.cachedUserServices().filter(
userServiceManager().getCacheStateFilter(services.UserDeployment.L1_CACHE)
).exclude(
Q(properties__name='destroy_after') & Q(properties__value='y')
).count()
inCacheL2: int = servicePool.cachedUserServices().filter(userServiceManager().getCacheStateFilter(services.UserDeployment.L2_CACHE)).count()
inAssigned: int = servicePool.assignedUserServices().filter(userServiceManager().getStateFilter()).count()
inCacheL1: int = (
servicePool.cachedUserServices()
.filter(
userServiceManager().getCacheStateFilter(
services.UserDeployment.L1_CACHE
)
)
.exclude(Q(properties__name='destroy_after') & Q(properties__value='y'))
.count()
)
inCacheL2: int = (
servicePool.cachedUserServices()
.filter(
userServiceManager().getCacheStateFilter(
services.UserDeployment.L2_CACHE
)
)
.count()
)
inAssigned: int = (
servicePool.assignedUserServices()
.filter(userServiceManager().getStateFilter())
.count()
)
# if we bypasses max cache, we will reduce it in first place. This is so because this will free resources on service provider
logger.debug("Examining %s with %s in cache L1 and %s in cache L2, %s inAssigned", servicePool.name, inCacheL1, inCacheL2, inAssigned)
logger.debug(
"Examining %s with %s in cache L1 and %s in cache L2, %s inAssigned",
servicePool.name,
inCacheL1,
inCacheL2,
inAssigned,
)
totalL1Assigned = inCacheL1 + inAssigned
# We have more than we want
@ -115,21 +163,34 @@ class ServiceCacheUpdater(Job):
servicesPools.append((servicePool, inCacheL1, inCacheL2, inAssigned))
continue
# We have more in L1 cache than needed
if totalL1Assigned > servicePool.initial_srvs and inCacheL1 > servicePool.cache_l1_srvs:
logger.debug('We have more services in cache L1 than configured, appending')
if (
totalL1Assigned > servicePool.initial_srvs
and inCacheL1 > servicePool.cache_l1_srvs
):
logger.debug(
'We have more services in cache L1 than configured, appending'
)
servicesPools.append((servicePool, inCacheL1, inCacheL2, inAssigned))
continue
# If we have more in L2 cache than needed, decrease L2 cache, but int this case, we continue checking cause L2 cache removal
# has less priority than l1 creations or removals, but higher. In this case, we will simply take last l2 oversized found and reduce it
if inCacheL2 > servicePool.cache_l2_srvs:
logger.debug('We have more services in L2 cache than configured, appending')
logger.debug(
'We have more services in L2 cache than configured, appending'
)
servicesPools.append((servicePool, inCacheL1, inCacheL2, inAssigned))
continue
# If this service don't allows more starting user services, continue
if userServiceManager().canInitiateServiceFromDeployedService(servicePool) is False:
logger.debug('This provider has the max allowed starting services running: %s', servicePool)
if (
userServiceManager().canInitiateServiceFromDeployedService(servicePool)
is False
):
logger.debug(
'This provider has the max allowed starting services running: %s',
servicePool,
)
continue
# If wee need to grow l2 cache, annotate it
@ -144,14 +205,19 @@ class ServiceCacheUpdater(Job):
if totalL1Assigned == servicePool.max_srvs:
continue
if totalL1Assigned < servicePool.initial_srvs or inCacheL1 < servicePool.cache_l1_srvs:
if (
totalL1Assigned < servicePool.initial_srvs
or inCacheL1 < servicePool.cache_l1_srvs
):
logger.debug('Needs to grow L1 cache for %s', servicePool)
servicesPools.append((servicePool, inCacheL1, inCacheL2, inAssigned))
# We also return calculated values so we can reuse then
return servicesPools
def growL1Cache(self, servicePool: ServicePool, cacheL1: int, cacheL2: int, assigned: int) -> None:
def growL1Cache(
self, servicePool: ServicePool, cacheL1: int, cacheL2: int, assigned: int
) -> None:
"""
This method tries to enlarge L1 cache.
@ -164,9 +230,20 @@ class ServiceCacheUpdater(Job):
if cacheL2 > 0:
valid = None
with transaction.atomic():
for n in servicePool.cachedUserServices().select_for_update().filter(userServiceManager().getCacheStateFilter(services.UserDeployment.L2_CACHE)).order_by('creation_date'):
for n in (
servicePool.cachedUserServices()
.select_for_update()
.filter(
userServiceManager().getCacheStateFilter(
services.UserDeployment.L2_CACHE
)
)
.order_by('creation_date')
):
if n.needsOsManager():
if State.isUsable(n.state) is False or State.isUsable(n.os_state):
if State.isUsable(n.state) is False or State.isUsable(
n.os_state
):
valid = n
break
else:
@ -179,15 +256,27 @@ class ServiceCacheUpdater(Job):
try:
# This has a velid publication, or it will not be here
userServiceManager().createCacheFor(
typing.cast(ServicePoolPublication, servicePool.activePublication()), services.UserDeployment.L1_CACHE
typing.cast(ServicePoolPublication, servicePool.activePublication()),
services.UserDeployment.L1_CACHE,
)
except MaxServicesReachedError:
log.doLog(servicePool, log.ERROR, 'Max number of services reached for this service', log.INTERNAL)
logger.warning('Max user services reached for %s: %s. Cache not created', servicePool.name, servicePool.max_srvs)
log.doLog(
servicePool,
log.ERROR,
'Max number of services reached for this service',
log.INTERNAL,
)
logger.warning(
'Max user services reached for %s: %s. Cache not created',
servicePool.name,
servicePool.max_srvs,
)
except Exception:
logger.exception('Exception')
def growL2Cache(self, servicePool: ServicePool, cacheL1: int, cacheL2: int, assigned: int) -> None:
def growL2Cache(
self, servicePool: ServicePool, cacheL1: int, cacheL2: int, assigned: int
) -> None:
"""
Tries to grow L2 cache of service.
@ -199,27 +288,38 @@ class ServiceCacheUpdater(Job):
try:
# This has a velid publication, or it will not be here
userServiceManager().createCacheFor(
typing.cast(ServicePoolPublication, servicePool.activePublication()), services.UserDeployment.L2_CACHE
typing.cast(ServicePoolPublication, servicePool.activePublication()),
services.UserDeployment.L2_CACHE,
)
except MaxServicesReachedError:
logger.warning('Max user services reached for %s: %s. Cache not created', servicePool.name, servicePool.max_srvs)
logger.warning(
'Max user services reached for %s: %s. Cache not created',
servicePool.name,
servicePool.max_srvs,
)
# TODO: When alerts are ready, notify this
def reduceL1Cache(self, servicePool: ServicePool, cacheL1: int, cacheL2: int, assigned: int):
def reduceL1Cache(
self, servicePool: ServicePool, cacheL1: int, cacheL2: int, assigned: int
):
logger.debug("Reducing L1 cache erasing a service in cache for %s", servicePool)
# We will try to destroy the newest cacheL1 element that is USABLE if the deployer can't cancel a new service creation
cacheItems: typing.List[UserService] = list(
servicePool.cachedUserServices().filter(
userServiceManager().getCacheStateFilter(services.UserDeployment.L1_CACHE)
).exclude(
Q(properties__name='destroy_after') & Q(properties__value='y')
).order_by(
'-creation_date'
).iterator()
servicePool.cachedUserServices()
.filter(
userServiceManager().getCacheStateFilter(
services.UserDeployment.L1_CACHE
)
)
.exclude(Q(properties__name='destroy_after') & Q(properties__value='y'))
.order_by('-creation_date')
.iterator()
)
if not cacheItems:
logger.debug('There is more services than max configured, but could not reduce cache L1 cause its already empty')
logger.debug(
'There is more services than max configured, but could not reduce cache L1 cause its already empty'
)
return
if cacheL2 < servicePool.cache_l2_srvs:
@ -240,12 +340,22 @@ class ServiceCacheUpdater(Job):
cache = cacheItems[0]
cache.removeOrCancel()
def reduceL2Cache(self, servicePool: ServicePool, cacheL1: int, cacheL2: int, assigned: int):
logger.debug("Reducing L2 cache erasing a service in cache for %s", servicePool.name)
def reduceL2Cache(
self, servicePool: ServicePool, cacheL1: int, cacheL2: int, assigned: int
):
logger.debug(
"Reducing L2 cache erasing a service in cache for %s", servicePool.name
)
if cacheL2 > 0:
cacheItems: typing.List[UserService] = servicePool.cachedUserServices().filter(
userServiceManager().getCacheStateFilter(services.UserDeployment.L2_CACHE)
).order_by('creation_date')
cacheItems = (
servicePool.cachedUserServices()
.filter(
userServiceManager().getCacheStateFilter(
services.UserDeployment.L2_CACHE
)
)
.order_by('creation_date')
)
# TODO: Look first for non finished cache items and cancel them?
cache = cacheItems[0]
cache.removeOrCancel()
@ -267,13 +377,21 @@ class ServiceCacheUpdater(Job):
# first, the service will get lock until someone removes something.
if totalL1Assigned > servicePool.max_srvs:
self.reduceL1Cache(servicePool, cacheL1, cacheL2, assigned)
elif totalL1Assigned > servicePool.initial_srvs and cacheL1 > servicePool.cache_l1_srvs:
elif (
totalL1Assigned > servicePool.initial_srvs
and cacheL1 > servicePool.cache_l1_srvs
):
self.reduceL1Cache(servicePool, cacheL1, cacheL2, assigned)
elif cacheL2 > servicePool.cache_l2_srvs: # We have excesives L2 items
self.reduceL2Cache(servicePool, cacheL1, cacheL2, assigned)
elif totalL1Assigned < servicePool.max_srvs and (totalL1Assigned < servicePool.initial_srvs or cacheL1 < servicePool.cache_l1_srvs): # We need more services
elif totalL1Assigned < servicePool.max_srvs and (
totalL1Assigned < servicePool.initial_srvs
or cacheL1 < servicePool.cache_l1_srvs
): # We need more services
self.growL1Cache(servicePool, cacheL1, cacheL2, assigned)
elif cacheL2 < servicePool.cache_l2_srvs: # We need more L2 items
self.growL2Cache(servicePool, cacheL1, cacheL2, assigned)
else:
logger.warning("We have more services than max requested for %s", servicePool.name)
logger.warning(
"We have more services than max requested for %s", servicePool.name
)

View File

@ -1,6 +1,6 @@
# -*- coding: utf-8 -*-
#
# Copyright (c) 2013-2019 Virtual Cable S.L.
# Copyright (c) 2013-2020 Virtual Cable S.L.U.
# All rights reserved.
#
# Redistribution and use in source and binary forms, with or without modification,
@ -46,31 +46,44 @@ class DeployedServiceStatsCollector(Job):
"""
This Job is responsible for collecting stats for every deployed service every ten minutes
"""
frecuency = 599 # Once every ten minutes, 601 is prime, 599 also is prime
friendly_name = 'Deployed Service Stats'
def run(self):
logger.debug('Starting Deployed service stats collector')
servicePoolsToCheck: typing.Iterable[ServicePool] = ServicePool.objects.filter(state=State.ACTIVE).iterator()
servicePoolsToCheck: typing.Iterable[ServicePool] = ServicePool.objects.filter(
state=State.ACTIVE
).iterator()
for servicePool in servicePoolsToCheck:
try:
fltr = servicePool.assignedUserServices().exclude(state__in=State.INFO_STATES)
fltr = servicePool.assignedUserServices().exclude(
state__in=State.INFO_STATES
)
assigned = fltr.count()
inUse = fltr.filter(in_use=True).count()
counters.addCounter(servicePool, counters.CT_ASSIGNED, assigned)
counters.addCounter(servicePool, counters.CT_INUSE, inUse)
except Exception:
logger.exception('Getting counters for service pool %s', servicePool.name)
logger.exception(
'Getting counters for service pool %s', servicePool.name
)
for auth in Authenticator.objects.all():
fltr = auth.users.filter(userServices__isnull=False).exclude(userServices__state__in=State.INFO_STATES)
fltr = auth.users.filter(userServices__isnull=False).exclude(
userServices__state__in=State.INFO_STATES
)
users = auth.users.all().count()
users_with_service = fltr.distinct().count()
number_assigned_services = fltr.count()
counters.addCounter(auth, counters.CT_AUTH_USERS, users)
counters.addCounter(auth, counters.CT_AUTH_SERVICES, number_assigned_services)
counters.addCounter(auth, counters.CT_AUTH_USERS_WITH_SERVICES, users_with_service)
counters.addCounter(
auth, counters.CT_AUTH_SERVICES, number_assigned_services
)
counters.addCounter(
auth, counters.CT_AUTH_USERS_WITH_SERVICES, users_with_service
)
logger.debug('Done Deployed service stats collector')

View File

@ -1,7 +1,6 @@
# -*- coding: utf-8 -*-
#
# Copyright (c) 2012-2019 Virtual Cable S.L.
# Copyright (c) 2012-2020 Virtual Cable S.L.U.
# All rights reserved.
#
# Redistribution and use in source and binary forms, with or without modification,
@ -43,13 +42,17 @@ from uds.core.util import log
logger = logging.getLogger(__name__)
MAX_STUCK_TIME = 3600 * 24 # At most 1 days "Stuck", not configurable (there is no need to)
MAX_STUCK_TIME = (
3600 * 24
) # At most 1 days "Stuck", not configurable (there is no need to)
class StuckCleaner(Job):
"""
Kaputen Cleaner is very similar to Hanged Cleaner
We keep it in a new place to "control" more specific thins
"""
frecuency = 3601 * 8 # Executes Once a day
friendly_name = 'Stuck States cleaner'
@ -57,33 +60,43 @@ class StuckCleaner(Job):
since_state: datetime = getSqlDatetime() - timedelta(seconds=MAX_STUCK_TIME)
# Filter for locating machine stuck on removing, cancelling, etc..
# Locate service pools with pending assigned service in use
servicePoolswithStucks = ServicePool.objects.annotate(
stuckCount=Count(
'userServices',
filter=Q(
userServices__state_date__lt=since_state
) & (Q(
userServices__state=State.PREPARING, userServices__properties__name='destroy_after'
) | ~Q(
userServices__state__in=State.INFO_STATES + State.VALID_STATES
))
servicePoolswithStucks = (
ServicePool.objects.annotate(
stuckCount=Count(
'userServices',
filter=Q(userServices__state_date__lt=since_state)
& (
Q(
userServices__state=State.PREPARING,
userServices__properties__name='destroy_after',
)
| ~Q(
userServices__state__in=State.INFO_STATES
+ State.VALID_STATES
)
),
)
)
).filter(service__provider__maintenance_mode=False, state=State.ACTIVE).exclude(stuckCount=0)
.filter(service__provider__maintenance_mode=False, state=State.ACTIVE)
.exclude(stuckCount=0)
)
# Info states are removed on UserServiceCleaner and VALID_STATES are ok, or if "hanged", checked on "HangedCleaner"
def stuckUserServices(servicePool: ServicePool ) -> typing.Iterable[UserService]:
q = servicePool.userServices.filter(
state_date__lt=since_state
)
yield from q.exclude(
state__in=State.INFO_STATES + State.VALID_STATES
)
def stuckUserServices(servicePool: ServicePool) -> typing.Iterable[UserService]:
q = servicePool.userServices.filter(state_date__lt=since_state)
yield from q.exclude(state__in=State.INFO_STATES + State.VALID_STATES)
yield from q.filter(state=State.PREPARING, properties__name='destroy_after')
for servicePool in servicePoolswithStucks:
# logger.debug('Searching for stuck states for %s', servicePool.name)
for stuck in stuckUserServices(servicePool):
logger.debug('Found stuck user service %s', stuck)
log.doLog(servicePool, log.ERROR, 'User service {} has been hard removed because it\'s stuck'.format(stuck.name))
log.doLog(
servicePool,
log.ERROR,
'User service {} has been hard removed because it\'s stuck'.format(
stuck.name
),
)
# stuck.setState(State.ERROR)
stuck.delete()

View File

@ -1,7 +1,7 @@
# -*- coding: utf-8 -*-
#
# Copyright (c) 2012-2019 Virtual Cable S.L.
# Copyright (c) 2012-2020 Virtual Cable S.L.U.
# All rights reserved.
#
# Redistribution and use in source and binary forms, with or without modification,
@ -32,6 +32,7 @@
"""
from importlib import import_module
import logging
import typing
from django.conf import settings
from uds.core.util.cache import Cache
@ -65,13 +66,13 @@ class TicketStoreCleaner(Job):
class SessionsCleaner(Job):
frecuency = 3600 * 24 * 7 # Once a day will be enough
frecuency = 3600 * 24 * 7 # Once a week will be enough
friendly_name = 'User Sessions cleaner'
def run(self):
logger.debug('Starting session cleanup')
try:
engine = import_module(settings.SESSION_ENGINE)
engine: typing.Any = import_module(settings.SESSION_ENGINE)
except Exception:
logger.exception('DjangoSessionsCleaner')
return

View File

@ -1,7 +1,6 @@
# -*- coding: utf-8 -*-
#
# Copyright (c) 2012-2019 Virtual Cable S.L.
# Copyright (c) 2012-2020 Virtual Cable S.L.U.
# All rights reserved.
#
# Redistribution and use in source and binary forms, with or without modification,
@ -46,5 +45,11 @@ class UsageAccounting(Job):
def run(self):
with transaction.atomic():
AccountUsage.objects.select_for_update().filter(user_service__in_use=True).update(end=getSqlDatetime())
AccountUsage.objects.select_for_update().filter(user_service__in_use=False).update(user_service=None) # Cleanup
AccountUsage.objects.select_for_update().filter(
user_service__in_use=True
).update(end=getSqlDatetime())
AccountUsage.objects.select_for_update().filter(
user_service__in_use=False
).update(
user_service=None
) # Cleanup

View File

@ -1,7 +1,6 @@
# -*- coding: utf-8 -*-
#
# Copyright (c) 2012-2019 Virtual Cable S.L.
# Copyright (c) 2012-2020 Virtual Cable S.L.U.
# All rights reserved.
#
# Redistribution and use in source and binary forms, with or without modification,
@ -51,38 +50,58 @@ logger = logging.getLogger(__name__)
class UserServiceInfoItemsCleaner(Job):
frecuency = 14401
frecuency_cfg = GlobalConfig.KEEP_INFO_TIME # Request run cache "info" cleaner every configured seconds. If config value is changed, it will be used at next reload
frecuency_cfg = (
GlobalConfig.KEEP_INFO_TIME
) # Request run cache "info" cleaner every configured seconds. If config value is changed, it will be used at next reload
friendly_name = 'User Service Info Cleaner'
def run(self):
removeFrom = getSqlDatetime() - timedelta(seconds=GlobalConfig.KEEP_INFO_TIME.getInt(True))
def run(self) -> None:
removeFrom = getSqlDatetime() - timedelta(
seconds=GlobalConfig.KEEP_INFO_TIME.getInt(True)
)
logger.debug('Removing information user services from %s', removeFrom)
with transaction.atomic():
UserService.objects.select_for_update().filter(state__in=State.INFO_STATES, state_date__lt=removeFrom).delete()
UserService.objects.select_for_update().filter(
state__in=State.INFO_STATES, state_date__lt=removeFrom
).delete()
class UserServiceRemover(Job):
frecuency = 31
frecuency_cfg = GlobalConfig.REMOVAL_CHECK # Request run cache "info" cleaner every configued seconds. If config value is changed, it will be used at next reload
frecuency_cfg = (
GlobalConfig.REMOVAL_CHECK
) # Request run cache "info" cleaner every configued seconds. If config value is changed, it will be used at next reload
friendly_name = 'User Service Cleaner'
def run(self):
removeAtOnce: int = GlobalConfig.USER_SERVICE_CLEAN_NUMBER.getInt() # Same, it will work at reload
def run(self) -> None:
removeAtOnce: int = (
GlobalConfig.USER_SERVICE_CLEAN_NUMBER.getInt()
) # Same, it will work at reload
with transaction.atomic():
removeFrom = getSqlDatetime() - timedelta(seconds=10) # We keep at least 10 seconds the machine before removing it, so we avoid connections errors
removableUserServices: typing.Iterable[UserService] = UserService.objects.filter(
removeFrom = getSqlDatetime() - timedelta(
seconds=10
) # We keep at least 10 seconds the machine before removing it, so we avoid connections errors
removableUserServices: typing.Iterable[
UserService
] = UserService.objects.filter(
state=State.REMOVABLE,
state_date__lt=removeFrom,
deployed_service__service__provider__maintenance_mode=False
)[0:removeAtOnce].iterator()
deployed_service__service__provider__maintenance_mode=False,
)[
0:removeAtOnce
].iterator()
manager = managers.userServiceManager()
for removableUserService in removableUserServices:
logger.debug('Checking removal of %s', removableUserService.name)
try:
if manager.canRemoveServiceFromDeployedService(removableUserService.deployed_service) is True:
if (
manager.canRemoveServiceFromDeployedService(
removableUserService.deployed_service
)
is True
):
manager.remove(removableUserService)
except Exception:
logger.exception('Exception removing user service')