mirror of
https://github.com/dkmstr/openuds.git
synced 2025-01-08 21:18:00 +03:00
fixed workers for 3.7
This commit is contained in:
parent
064423413b
commit
52054ae45b
@ -56,7 +56,7 @@ def getComputerName():
|
||||
|
||||
def getNetworkInfo():
|
||||
obj = win32com.client.Dispatch("WbemScripting.SWbemLocator")
|
||||
wmobj = obj.ConnectServer("localhost", "root\cimv2")
|
||||
wmobj = obj.ConnectServer("localhost", "root\\cimv2")
|
||||
adapters = wmobj.ExecQuery("Select * from Win32_NetworkAdapterConfiguration where IpEnabled=True")
|
||||
try:
|
||||
for obj in adapters:
|
||||
|
@ -42,7 +42,7 @@ class Job(Environmentable):
|
||||
# Default frecuency, once a day. Remenber that precision will be based on "granurality" of Scheduler
|
||||
# If a job is used for delayed execution, this attribute is in fact ignored
|
||||
frecuency: int = 24 * 3600 + 3 # Defaults to a big one, and i know frecuency is written as frequency, but this is an "historical mistake" :)
|
||||
frecuency_cfg: typing.Optional[Config] = None # If we use a configuration variable from DB, we need to update the frecuency asap, but not before app is ready
|
||||
frecuency_cfg: typing.Optional[Config.Value] = None # If we use a configuration variable from DB, we need to update the frecuency asap, but not before app is ready
|
||||
friendly_name = 'Unknown'
|
||||
|
||||
@classmethod
|
||||
|
@ -1,7 +1,7 @@
|
||||
# -*- coding: utf-8 -*-
|
||||
|
||||
#
|
||||
# Copyright (c) 2012 Virtual Cable S.L.
|
||||
# Copyright (c) 2012-2019 Virtual Cable S.L.
|
||||
# All rights reserved.
|
||||
#
|
||||
# Redistribution and use in source and binary forms, with or without modification,
|
||||
@ -30,8 +30,6 @@
|
||||
"""
|
||||
@author: Adolfo Gómez, dkmaster at dkmon dot com
|
||||
"""
|
||||
from __future__ import unicode_literals
|
||||
|
||||
import logging
|
||||
from datetime import timedelta
|
||||
|
||||
@ -48,9 +46,6 @@ class AssignedAndUnused(Job):
|
||||
frecuency_cfg = GlobalConfig.CHECK_UNUSED_TIME
|
||||
friendly_name = 'Unused services checker'
|
||||
|
||||
def __init__(self, environment):
|
||||
super(AssignedAndUnused, self).__init__(environment)
|
||||
|
||||
def run(self):
|
||||
since_state = getSqlDatetime() - timedelta(seconds=self.frecuency)
|
||||
for ds in DeployedService.objects.all():
|
||||
@ -61,11 +56,11 @@ class AssignedAndUnused(Job):
|
||||
if ds.osmanager is not None:
|
||||
osm = ds.osmanager.getInstance()
|
||||
if osm.processUnusedMachines is True:
|
||||
logger.debug('Processing unused services for {}, {}'.format(ds, ds.osmanager))
|
||||
logger.debug('Processing unused services for %s, %s', ds, ds.osmanager)
|
||||
for us in ds.assignedUserServices().filter(in_use=False, state_date__lt=since_state, state=State.USABLE, os_state=State.USABLE):
|
||||
logger.debug('Found unused assigned service {0}'.format(us))
|
||||
logger.debug('Found unused assigned service %s', us)
|
||||
osm.processUnused(us)
|
||||
else: # No os manager, simply remove unused services in specified time
|
||||
for us in ds.assignedUserServices().filter(in_use=False, state_date__lt=since_state, state=State.USABLE, os_state=State.USABLE):
|
||||
logger.debug('Found unused assigned service with no OS Manager {0}'.format(us))
|
||||
logger.debug('Found unused assigned service with no OS Manager %s', us)
|
||||
us.remove()
|
||||
|
@ -1,7 +1,7 @@
|
||||
# -*- coding: utf-8 -*-
|
||||
|
||||
#
|
||||
# Copyright (c) 2012 Virtual Cable S.L.
|
||||
# Copyright (c) 2012-2019 Virtual Cable S.L.
|
||||
# All rights reserved.
|
||||
#
|
||||
# Redistribution and use in source and binary forms, with or without modification,
|
||||
@ -30,15 +30,15 @@
|
||||
"""
|
||||
@author: Adolfo Gómez, dkmaster at dkmon dot com
|
||||
"""
|
||||
from __future__ import unicode_literals
|
||||
from datetime import timedelta
|
||||
import logging
|
||||
import typing
|
||||
|
||||
from django.db import transaction
|
||||
from uds.core.util.Config import GlobalConfig
|
||||
from uds.models import DeployedService, getSqlDatetime
|
||||
from uds.models import ServicePool, UserService, getSqlDatetime
|
||||
from uds.core.util.State import State
|
||||
from uds.core.jobs.Job import Job
|
||||
from datetime import timedelta
|
||||
import logging
|
||||
|
||||
logger = logging.getLogger(__name__)
|
||||
|
||||
@ -48,12 +48,9 @@ class DeployedServiceInfoItemsCleaner(Job):
|
||||
frecuency_cfg = GlobalConfig.CLEANUP_CHECK # Request run cache "info" cleaner every configured seconds. If config value is changed, it will be used at next reload
|
||||
friendly_name = 'Deployed Service Info Cleaner'
|
||||
|
||||
def __init__(self, environment):
|
||||
super(DeployedServiceInfoItemsCleaner, self).__init__(environment)
|
||||
|
||||
def run(self):
|
||||
removeFrom = getSqlDatetime() - timedelta(seconds=GlobalConfig.KEEP_INFO_TIME.getInt())
|
||||
DeployedService.objects.filter(state__in=State.INFO_STATES, state_date__lt=removeFrom).delete()
|
||||
ServicePool.objects.filter(state__in=State.INFO_STATES, state_date__lt=removeFrom).delete()
|
||||
|
||||
|
||||
class DeployedServiceRemover(Job):
|
||||
@ -61,97 +58,90 @@ class DeployedServiceRemover(Job):
|
||||
frecuency_cfg = GlobalConfig.REMOVAL_CHECK # Request run publication "removal" every configued seconds. If config value is changed, it will be used at next reload
|
||||
friendly_name = 'Deployed Service Cleaner'
|
||||
|
||||
def __init__(self, environment):
|
||||
super(DeployedServiceRemover, self).__init__(environment)
|
||||
|
||||
def startRemovalOf(self, ds):
|
||||
if ds.service is None: # Maybe an inconsistent value?
|
||||
logger.error('Found service pool {} without service')
|
||||
ds.delete()
|
||||
def startRemovalOf(self, servicePool: ServicePool):
|
||||
if servicePool.service is None: # Maybe an inconsistent value? (must not, but if no ref integrity in db, maybe someone "touched.. ;)")
|
||||
logger.error('Found service pool %s without service', servicePool.name)
|
||||
servicePool.delete() # Just remove it "a las bravas", the best we can do
|
||||
return
|
||||
|
||||
# Get publications in course...., that only can be one :)
|
||||
logger.debug('Removal process of {0}'.format(ds))
|
||||
logger.debug('Removal process of %s', servicePool)
|
||||
|
||||
publishing = ds.publications.filter(state=State.PREPARING)
|
||||
for p in publishing:
|
||||
p.cancel()
|
||||
publishing = servicePool.publications.filter(state=State.PREPARING)
|
||||
for pub in publishing:
|
||||
pub.cancel()
|
||||
# Now all publishments are canceling, let's try to cancel cache and assigned
|
||||
uServices = ds.userServices.filter(state=State.PREPARING)
|
||||
for u in uServices:
|
||||
logger.debug('Canceling {0}'.format(u))
|
||||
u.cancel()
|
||||
uServices: typing.Iterable[UserService] = servicePool.userServices.filter(state=State.PREPARING)
|
||||
for userService in uServices:
|
||||
logger.debug('Canceling %s', userService)
|
||||
userService.cancel()
|
||||
# Nice start of removal, maybe we need to do some limitation later, but there should not be too much services nor publications cancelable at once
|
||||
ds.state = State.REMOVING
|
||||
ds.name += ' (removed)'
|
||||
ds.save()
|
||||
servicePool.state = State.REMOVING
|
||||
servicePool.name += ' (removed)'
|
||||
servicePool.save()
|
||||
|
||||
def continueRemovalOf(self, ds):
|
||||
def continueRemovalOf(self, servicePool: ServicePool):
|
||||
# Recheck that there is no publication created in "bad moment"
|
||||
try:
|
||||
for p in ds.publications.filter(state=State.PREPARING):
|
||||
p.cancel()
|
||||
for pub in servicePool.publications.filter(state=State.PREPARING):
|
||||
pub.cancel()
|
||||
except Exception:
|
||||
pass
|
||||
|
||||
try:
|
||||
# Now all publishments are canceling, let's try to cancel cache and assigned
|
||||
uServices = ds.userServices.filter(state=State.PREPARING)
|
||||
for u in uServices:
|
||||
logger.debug('Canceling {0}'.format(u))
|
||||
u.cancel()
|
||||
# Now all publications are canceling, let's try to cancel cache and assigned also
|
||||
uServices: typing.Iterable[UserService] = servicePool.userServices.filter(state=State.PREPARING)
|
||||
for userService in uServices:
|
||||
logger.debug('Canceling %s', userService)
|
||||
userService.cancel()
|
||||
except Exception:
|
||||
pass
|
||||
|
||||
# First, we remove all publications and user services in "info_state"
|
||||
with transaction.atomic():
|
||||
ds.userServices.select_for_update().filter(state__in=State.INFO_STATES).delete()
|
||||
servicePool.userServices.select_for_update().filter(state__in=State.INFO_STATES).delete()
|
||||
|
||||
# Mark usable user services as removable
|
||||
now = getSqlDatetime()
|
||||
|
||||
with transaction.atomic():
|
||||
ds.userServices.select_for_update().filter(state=State.USABLE).update(state=State.REMOVABLE, state_date=now)
|
||||
servicePool.userServices.select_for_update().filter(state=State.USABLE).update(state=State.REMOVABLE, state_date=now)
|
||||
|
||||
# When no service is at database, we start with publications
|
||||
if ds.userServices.all().count() == 0:
|
||||
if servicePool.userServices.all().count() == 0:
|
||||
try:
|
||||
logger.debug('All services removed, checking active publication')
|
||||
if ds.activePublication() is not None:
|
||||
if servicePool.activePublication() is not None:
|
||||
logger.debug('Active publication found, unpublishing it')
|
||||
ds.unpublish()
|
||||
servicePool.unpublish()
|
||||
else:
|
||||
logger.debug('No active publication found, removing info states and checking if removal is done')
|
||||
ds.publications.filter(state__in=State.INFO_STATES).delete()
|
||||
if ds.publications.count() is 0:
|
||||
ds.removed() # Mark it as removed, clean later from database
|
||||
servicePool.publications.filter(state__in=State.INFO_STATES).delete()
|
||||
if servicePool.publications.count() == 0:
|
||||
servicePool.removed() # Mark it as removed, clean later from database
|
||||
except Exception:
|
||||
logger.exception('Cought unexpected exception at continueRemovalOf: ')
|
||||
|
||||
def run(self):
|
||||
# First check if there is someone in "removable" estate
|
||||
rems = DeployedService.objects.filter(state=State.REMOVABLE)[:10]
|
||||
if len(rems) > 0:
|
||||
# logger.debug('Found a deployed service marked for removal. Starting removal of {0}'.format(rems))
|
||||
for ds in rems:
|
||||
removableServicePools: typing.Iterable[ServicePool] = ServicePool.objects.filter(state=State.REMOVABLE)[:10]
|
||||
for servicePool in removableServicePools:
|
||||
try:
|
||||
# Skips checking deployed services in maintenance mode
|
||||
if servicePool.isInMaintenance() is False:
|
||||
self.startRemovalOf(servicePool)
|
||||
except Exception as e1:
|
||||
logger.error('Error removing service pool %s: %s', servicePool.name, e1)
|
||||
try:
|
||||
# Skips checking deployed services in maintenance mode
|
||||
if ds.isInMaintenance() is False:
|
||||
self.startRemovalOf(ds)
|
||||
except Exception as e1:
|
||||
logger.error('Error removing {}: {}'.format(ds, e1))
|
||||
try:
|
||||
ds.delete()
|
||||
except Exception as e2:
|
||||
logger.error('Could not delete {}'.format(e2))
|
||||
servicePool.delete()
|
||||
except Exception as e2:
|
||||
logger.error('Could not delete %s', e2)
|
||||
|
||||
rems = DeployedService.objects.filter(state=State.REMOVING)[:10]
|
||||
if len(rems) > 0:
|
||||
# logger.debug('Found a deployed service in removing state, continuing removal of {0}'.format(rems))
|
||||
for ds in rems:
|
||||
try:
|
||||
# Skips checking deployed services in maintenance mode
|
||||
if ds.isInMaintenance() is False:
|
||||
self.continueRemovalOf(ds)
|
||||
except Exception:
|
||||
logger.exception('Removing deployed service')
|
||||
removingServicePools: typing.Iterable[ServicePool] = ServicePool.objects.filter(state=State.REMOVING)[:10]
|
||||
for servicePool in removingServicePools:
|
||||
try:
|
||||
# Skips checking deployed services in maintenance mode
|
||||
if servicePool.isInMaintenance() is False:
|
||||
self.continueRemovalOf(servicePool)
|
||||
except Exception:
|
||||
logger.exception('Removing deployed service')
|
||||
|
@ -1,7 +1,7 @@
|
||||
# -*- coding: utf-8 -*-
|
||||
|
||||
#
|
||||
# Copyright (c) 2012 Virtual Cable S.L.
|
||||
# Copyright (c) 2012-2019 Virtual Cable S.L.
|
||||
# All rights reserved.
|
||||
#
|
||||
# Redistribution and use in source and binary forms, with or without modification,
|
||||
@ -30,16 +30,15 @@
|
||||
"""
|
||||
@author: Adolfo Gómez, dkmaster at dkmon dot com
|
||||
"""
|
||||
from __future__ import unicode_literals
|
||||
from datetime import timedelta
|
||||
import logging
|
||||
|
||||
from django.db.models import Q
|
||||
from uds.core.util.Config import GlobalConfig
|
||||
from uds.models import DeployedService, getSqlDatetime
|
||||
from uds.models import ServicePool, getSqlDatetime
|
||||
from uds.core.util.State import State
|
||||
from uds.core.jobs.Job import Job
|
||||
from uds.core.util import log
|
||||
from datetime import timedelta
|
||||
import logging
|
||||
|
||||
logger = logging.getLogger(__name__)
|
||||
|
||||
@ -49,20 +48,20 @@ class HangedCleaner(Job):
|
||||
frecuency_cfg = GlobalConfig.MAX_INITIALIZING_TIME
|
||||
friendly_name = 'Hanged services checker'
|
||||
|
||||
def __init__(self, environment):
|
||||
super(HangedCleaner, self).__init__(environment)
|
||||
|
||||
def run(self):
|
||||
since_state = getSqlDatetime() - timedelta(seconds=GlobalConfig.MAX_INITIALIZING_TIME.getInt())
|
||||
# Filter for locating machine not ready
|
||||
flt = Q(state_date__lt=since_state, state=State.PREPARING) | Q(state_date__lt=since_state, state=State.USABLE, os_state=State.PREPARING) | Q(state_date__lt=since_state, state=State.REMOVING)
|
||||
|
||||
for ds in DeployedService.objects.exclude(osmanager=None, state__in=State.VALID_STATES, service__provider__maintenance_mode=True):
|
||||
logger.debug('Searching for hanged services for {0}'.format(ds))
|
||||
for us in ds.userServices.filter(flt):
|
||||
logger.debug('Found hanged service {0}'.format(us))
|
||||
# Type
|
||||
servicePool: ServicePool
|
||||
|
||||
for servicePool in ServicePool.objects.exclude(osmanager=None, state__in=State.VALID_STATES, service__provider__maintenance_mode=True):
|
||||
logger.debug('Searching for hanged services for %s', servicePool)
|
||||
for us in servicePool.userServices.filter(flt):
|
||||
logger.debug('Found hanged service %s', us)
|
||||
log.doLog(us, log.ERROR, 'User Service seems to be hanged. Removing it.', log.INTERNAL)
|
||||
log.doLog(ds, log.ERROR, 'Removing user service {0} because it seems to be hanged'.format(us.friendly_name))
|
||||
log.doLog(servicePool, log.ERROR, 'Removing user service {0} because it seems to be hanged'.format(us.friendly_name))
|
||||
if us.state in (State.REMOVING,):
|
||||
us.setState(State.ERROR)
|
||||
else:
|
||||
|
@ -1,7 +1,7 @@
|
||||
# -*- coding: utf-8 -*-
|
||||
|
||||
#
|
||||
# Copyright (c) 2012 Virtual Cable S.L.
|
||||
# Copyright (c) 2012-2019 Virtual Cable S.L.
|
||||
# All rights reserved.
|
||||
#
|
||||
# Redistribution and use in source and binary forms, with or without modification,
|
||||
@ -30,7 +30,9 @@
|
||||
"""
|
||||
@author: Adolfo Gómez, dkmaster at dkmon dot com
|
||||
"""
|
||||
from __future__ import unicode_literals
|
||||
from datetime import timedelta
|
||||
import logging
|
||||
import typing
|
||||
|
||||
from uds.core.managers.PublicationManager import PublicationManager
|
||||
from uds.core.util.Config import GlobalConfig
|
||||
@ -38,8 +40,6 @@ from uds.models import DeployedServicePublication, getSqlDatetime
|
||||
from uds.core.services.Exceptions import PublishException
|
||||
from uds.core.util.State import State
|
||||
from uds.core.jobs import Job
|
||||
from datetime import timedelta
|
||||
import logging
|
||||
|
||||
logger = logging.getLogger(__name__)
|
||||
|
||||
@ -49,26 +49,20 @@ class PublicationInfoItemsCleaner(Job):
|
||||
frecuency_cfg = GlobalConfig.CLEANUP_CHECK # Request run cache "info" cleaner every configured seconds. If config value is changed, it will be used at next reload
|
||||
friendly_name = 'Publications Info Cleaner'
|
||||
|
||||
def __init__(self, environment):
|
||||
super(PublicationInfoItemsCleaner, self).__init__(environment)
|
||||
|
||||
def run(self):
|
||||
removeFrom = getSqlDatetime() - timedelta(seconds=GlobalConfig.KEEP_INFO_TIME.getInt(True))
|
||||
DeployedServicePublication.objects.filter(state__in=State.INFO_STATES, state_date__lt=removeFrom).delete()
|
||||
|
||||
|
||||
class PublicationCleaner(Job):
|
||||
frecuency = GlobalConfig.REMOVAL_CHECK.getInt() # Request run publication "removal" every configued seconds. If config value is changed, it will be used at next reload
|
||||
frecuency = 31
|
||||
frecuency_cfg = GlobalConfig.REMOVAL_CHECK # Request run publication "removal" every configued seconds. If config value is changed, it will be used at next reload
|
||||
friendly_name = 'Publication Cleaner'
|
||||
|
||||
def __init__(self, environment):
|
||||
super(PublicationCleaner, self).__init__(environment)
|
||||
|
||||
def run(self):
|
||||
removables = DeployedServicePublication.objects.filter(state=State.REMOVABLE, deployed_service__service__provider__maintenance_mode=False)
|
||||
removables: typing.Iterable[DeployedServicePublication] = DeployedServicePublication.objects.filter(state=State.REMOVABLE, deployed_service__service__provider__maintenance_mode=False)
|
||||
for removable in removables:
|
||||
try:
|
||||
PublicationManager.manager().unpublish(removable)
|
||||
except PublishException: # Can say that it cant be removed right now
|
||||
logger.debug('Delaying removal')
|
||||
pass
|
||||
|
@ -1,7 +1,7 @@
|
||||
# -*- coding: utf-8 -*-
|
||||
|
||||
#
|
||||
# Copyright (c) 2012 Virtual Cable S.L.
|
||||
# Copyright (c) 2012-2019 Virtual Cable S.L.
|
||||
# All rights reserved.
|
||||
#
|
||||
# Redistribution and use in source and binary forms, with or without modification,
|
||||
@ -30,11 +30,10 @@
|
||||
"""
|
||||
@author: Adolfo Gómez, dkmaster at dkmon dot com
|
||||
"""
|
||||
from __future__ import unicode_literals
|
||||
import logging
|
||||
|
||||
from uds.models import CalendarAction, getSqlDatetime
|
||||
from uds.core.jobs.Job import Job
|
||||
import logging
|
||||
|
||||
logger = logging.getLogger(__name__)
|
||||
|
||||
@ -43,13 +42,14 @@ class ScheduledAction(Job):
|
||||
frecuency = 29 # Frecuncy for this job
|
||||
friendly_name = 'Scheduled action runner'
|
||||
|
||||
def __init__(self, environment):
|
||||
super(ScheduledAction, self).__init__(environment)
|
||||
|
||||
def run(self):
|
||||
for ca in CalendarAction.objects.filter(service_pool__service__provider__maintenance_mode=False, next_execution__lt=getSqlDatetime()).order_by('next_execution'):
|
||||
logger.info('Executing calendar action {}.{} ({})'.format(ca.service_pool.name, ca.calendar.name, ca.action))
|
||||
configuredAction: CalendarAction
|
||||
for configuredAction in CalendarAction.objects.filter(
|
||||
service_pool__service__provider__maintenance_mode=False,
|
||||
next_execution__lt=getSqlDatetime()
|
||||
).order_by('next_execution'):
|
||||
logger.info('Executing calendar action %s.%s (%s)', configuredAction.service_pool.name, configuredAction.calendar.name, configuredAction.action)
|
||||
try:
|
||||
ca.execute()
|
||||
except Exception as e:
|
||||
logger.exception('Got an exception executing calendar access action: {}'.format(e))
|
||||
configuredAction.execute()
|
||||
except Exception:
|
||||
logger.exception('Got an exception executing calendar access action: %s', configuredAction)
|
||||
|
@ -1,7 +1,7 @@
|
||||
# -*- coding: utf-8 -*-
|
||||
|
||||
#
|
||||
# Copyright (c) 2012 Virtual Cable S.L.
|
||||
# Copyright (c) 2012-2019 Virtual Cable S.L.
|
||||
# All rights reserved.
|
||||
#
|
||||
# Redistribution and use in source and binary forms, with or without modification,
|
||||
@ -30,15 +30,14 @@
|
||||
"""
|
||||
@author: Adolfo Gómez, dkmaster at dkmon dot com
|
||||
"""
|
||||
from __future__ import unicode_literals
|
||||
from datetime import timedelta
|
||||
import logging
|
||||
|
||||
from django.db import transaction
|
||||
|
||||
from uds.models import Scheduler, getSqlDatetime
|
||||
from uds.core.util.State import State
|
||||
from uds.core.jobs.Job import Job
|
||||
from datetime import timedelta
|
||||
import logging
|
||||
|
||||
logger = logging.getLogger(__name__)
|
||||
|
||||
@ -47,9 +46,6 @@ class SchedulerHousekeeping(Job):
|
||||
frecuency = 301 # Frecuncy for this job
|
||||
friendly_name = 'Scheduler house keeping'
|
||||
|
||||
def __init__(self, environment):
|
||||
super(SchedulerHousekeeping, self).__init__(environment)
|
||||
|
||||
def run(self):
|
||||
"""
|
||||
Look for "hanged" scheduler tasks and reset them
|
||||
@ -57,5 +53,3 @@ class SchedulerHousekeeping(Job):
|
||||
since = getSqlDatetime() - timedelta(minutes=15)
|
||||
with transaction.atomic():
|
||||
Scheduler.objects.select_for_update().filter(last_execution__lt=since, state=State.RUNNING).update(owner_server='', state=State.FOR_EXECUTE)
|
||||
|
||||
|
||||
|
@ -1,7 +1,7 @@
|
||||
# -*- coding: utf-8 -*-
|
||||
|
||||
#
|
||||
# Copyright (c) 2012 Virtual Cable S.L.
|
||||
# Copyright (c) 2012-2019 Virtual Cable S.L.
|
||||
# All rights reserved.
|
||||
#
|
||||
# Redistribution and use in source and binary forms, with or without modification,
|
||||
@ -30,7 +30,8 @@
|
||||
"""
|
||||
@author: Adolfo Gómez, dkmaster at dkmon dot com
|
||||
"""
|
||||
from __future__ import unicode_literals
|
||||
import logging
|
||||
import typing
|
||||
|
||||
from django.db import transaction
|
||||
from django.db.models import Q
|
||||
@ -38,11 +39,10 @@ from uds.core.util.Config import GlobalConfig
|
||||
from uds.core.util.State import State
|
||||
from uds.core.managers.UserServiceManager import UserServiceManager
|
||||
from uds.core.services.Exceptions import MaxServicesReachedError
|
||||
from uds.models import DeployedService
|
||||
from uds.models import ServicePool, DeployedServicePublication, UserService
|
||||
from uds.core import services
|
||||
from uds.core.util import log
|
||||
from uds.core.jobs.Job import Job
|
||||
import logging
|
||||
|
||||
logger = logging.getLogger(__name__)
|
||||
|
||||
@ -56,99 +56,98 @@ class ServiceCacheUpdater(Job):
|
||||
"""
|
||||
frecuency = 19
|
||||
frecuency_cfg = GlobalConfig.CACHE_CHECK_DELAY # Request run cache manager every configured seconds (defaults to 20 seconds).
|
||||
|
||||
friendly_name = 'Service Cache Updater'
|
||||
|
||||
def __init__(self, environment):
|
||||
super(ServiceCacheUpdater, self).__init__(environment)
|
||||
@staticmethod
|
||||
def calcProportion(max_, actual) -> int:
|
||||
return actual * 10000 // (max_ or 1)
|
||||
|
||||
@staticmethod
|
||||
def calcProportion(max_, actual):
|
||||
return actual * 10000 / (max_ or 1)
|
||||
def __notifyRestrain(servicePool) -> None:
|
||||
log.doLog(servicePool, log.WARN, 'Service Pool is restrained due to excesive errors', log.INTERNAL)
|
||||
logger.info('%s is restrained, will check this later', servicePool.name)
|
||||
|
||||
@staticmethod
|
||||
def __notifyRestrain(deployedService):
|
||||
log.doLog(deployedService, log.WARN, 'Service Pool is restrained due to errors', log.INTERNAL)
|
||||
logger.info(' {0} is restrained, will check this later'.format(deployedService.name))
|
||||
|
||||
def servicesPoolsNeedingCacheUpdate(self):
|
||||
def servicesPoolsNeedingCacheUpdate(self) -> typing.List[typing.Tuple[ServicePool, int, int, int]]:
|
||||
# State filter for cached and inAssigned objects
|
||||
# First we get all deployed services that could need cache generation
|
||||
DeployedService.objects.update()
|
||||
# We start filtering out the deployed services that do not need caching at all.
|
||||
whichNeedsCaching = DeployedService.objects.filter(Q(initial_srvs__gte=0) | Q(cache_l1_srvs__gte=0)).filter(max_srvs__gt=0, state=State.ACTIVE,
|
||||
service__provider__maintenance_mode=False)[:]
|
||||
servicePoolsNeedingCaching: typing.Iterable[ServicePool] = ServicePool.objects.filter(
|
||||
Q(initial_srvs__gte=0) | Q(cache_l1_srvs__gte=0)
|
||||
).filter(
|
||||
max_srvs__gt=0, state=State.ACTIVE, service__provider__maintenance_mode=False
|
||||
).iterator()
|
||||
|
||||
# We will get the one that proportionally needs more cache
|
||||
servicesPools = []
|
||||
for sp in whichNeedsCaching:
|
||||
sp.userServices.update() # Cleans cached queries
|
||||
servicesPools: typing.List[typing.Tuple[ServicePool, int, int, int]] = []
|
||||
for servicePool in servicePoolsNeedingCaching:
|
||||
servicePool.userServices.update() # Cleans cached queries
|
||||
# If this deployedService don't have a publication active and needs it, ignore it
|
||||
spServiceInstance = sp.service.getInstance()
|
||||
spServiceInstance = servicePool.service.getInstance()
|
||||
|
||||
if sp.activePublication() is None and spServiceInstance.publicationType is not None:
|
||||
logger.debug('{} Needs publication but do not have one, cache test ignored'.format(sp))
|
||||
if servicePool.activePublication() is None and spServiceInstance.publicationType is not None:
|
||||
logger.debug('Skipping. %s Needs publication but do not have one', servicePool.name)
|
||||
continue
|
||||
# If it has any running publication, do not generate cache anymore
|
||||
if sp.publications.filter(state=State.PREPARING).count() > 0:
|
||||
logger.debug('Stopped cache generation for deployed service with publication running: {0}'.format(sp))
|
||||
if servicePool.publications.filter(state=State.PREPARING).count() > 0:
|
||||
logger.debug('Skipping cache generation for service pool with publication running: %s', servicePool.name)
|
||||
continue
|
||||
|
||||
if sp.isRestrained():
|
||||
ServiceCacheUpdater.__notifyRestrain(sp)
|
||||
if servicePool.isRestrained():
|
||||
logger.debug('StopSkippingped cache generation for restrained service pool: %s', servicePool.name)
|
||||
ServiceCacheUpdater.__notifyRestrain(servicePool)
|
||||
continue
|
||||
|
||||
# Get data related to actual state of cache
|
||||
inCacheL1 = sp.cachedUserServices().filter(UserServiceManager.getCacheStateFilter(services.UserDeployment.L1_CACHE)).count()
|
||||
inCacheL2 = sp.cachedUserServices().filter(UserServiceManager.getCacheStateFilter(services.UserDeployment.L2_CACHE)).count()
|
||||
inAssigned = sp.assignedUserServices().filter(UserServiceManager.getStateFilter()).count()
|
||||
inCacheL1: int = servicePool.cachedUserServices().filter(UserServiceManager.getCacheStateFilter(services.UserDeployment.L1_CACHE)).count()
|
||||
inCacheL2: int = servicePool.cachedUserServices().filter(UserServiceManager.getCacheStateFilter(services.UserDeployment.L2_CACHE)).count()
|
||||
inAssigned: int = servicePool.assignedUserServices().filter(UserServiceManager.getStateFilter()).count()
|
||||
# if we bypasses max cache, we will reduce it in first place. This is so because this will free resources on service provider
|
||||
logger.debug("Examining {0} with {1} in cache L1 and {2} in cache L2, {3} inAssigned".format(
|
||||
sp, inCacheL1, inCacheL2, inAssigned))
|
||||
logger.debug("Examining %s with %s in cache L1 and %s in cache L2, %s inAssigned", servicePool.name, inCacheL1, inCacheL2, inAssigned)
|
||||
totalL1Assigned = inCacheL1 + inAssigned
|
||||
|
||||
# We have more than we want
|
||||
if totalL1Assigned > sp.max_srvs:
|
||||
logger.debug('We have more services than max configured')
|
||||
servicesPools.append((sp, inCacheL1, inCacheL2, inAssigned))
|
||||
if totalL1Assigned > servicePool.max_srvs:
|
||||
logger.debug('We have more services than max configured. skipping.')
|
||||
servicesPools.append((servicePool, inCacheL1, inCacheL2, inAssigned))
|
||||
continue
|
||||
# We have more in L1 cache than needed
|
||||
if totalL1Assigned > sp.initial_srvs and inCacheL1 > sp.cache_l1_srvs:
|
||||
logger.debug('We have more services in cache L1 than configured')
|
||||
servicesPools.append((sp, inCacheL1, inCacheL2, inAssigned))
|
||||
if totalL1Assigned > servicePool.initial_srvs and inCacheL1 > servicePool.cache_l1_srvs:
|
||||
logger.debug('We have more services in cache L1 than configured, appending')
|
||||
servicesPools.append((servicePool, inCacheL1, inCacheL2, inAssigned))
|
||||
continue
|
||||
|
||||
# If we have more in L2 cache than needed, decrease L2 cache, but int this case, we continue checking cause L2 cache removal
|
||||
# has less priority than l1 creations or removals, but higher. In this case, we will simply take last l2 oversized found and reduce it
|
||||
if inCacheL2 > sp.cache_l2_srvs:
|
||||
logger.debug('We have more services in L2 cache than configured, decreasing it')
|
||||
servicesPools.append((sp, inCacheL1, inCacheL2, inAssigned))
|
||||
if inCacheL2 > servicePool.cache_l2_srvs:
|
||||
logger.debug('We have more services in L2 cache than configured, appending')
|
||||
servicesPools.append((servicePool, inCacheL1, inCacheL2, inAssigned))
|
||||
continue
|
||||
|
||||
# If this service don't allows more starting user services, continue
|
||||
if UserServiceManager.manager().canInitiateServiceFromDeployedService(sp) is False:
|
||||
logger.debug('This provider has the max allowed starting services running: {0}'.format(sp))
|
||||
if UserServiceManager.manager().canInitiateServiceFromDeployedService(servicePool) is False:
|
||||
logger.debug('This provider has the max allowed starting services running: %s', servicePool)
|
||||
continue
|
||||
|
||||
# If wee need to grow l2 cache, annotate it
|
||||
# Whe check this before checking the total, because the l2 cache is independent of max services or l1 cache.
|
||||
# It reflects a value that must be keeped in cache for futre fast use.
|
||||
if inCacheL2 < sp.cache_l2_srvs:
|
||||
logger.debug('Needs to grow L2 cache for {}'.format(sp))
|
||||
servicesPools.append((sp, inCacheL1, inCacheL2, inAssigned))
|
||||
if inCacheL2 < servicePool.cache_l2_srvs:
|
||||
logger.debug('Needs to grow L2 cache for %s', servicePool)
|
||||
servicesPools.append((servicePool, inCacheL1, inCacheL2, inAssigned))
|
||||
continue
|
||||
|
||||
# We skip it if already at max
|
||||
if totalL1Assigned == sp.max_srvs:
|
||||
if totalL1Assigned == servicePool.max_srvs:
|
||||
continue
|
||||
|
||||
if totalL1Assigned < sp.initial_srvs or inCacheL1 < sp.cache_l1_srvs:
|
||||
logger.debug('Needs to grow L1 cache for {}'.format(sp))
|
||||
servicesPools.append((sp, inCacheL1, inCacheL2, inAssigned))
|
||||
if totalL1Assigned < servicePool.initial_srvs or inCacheL1 < servicePool.cache_l1_srvs:
|
||||
logger.debug('Needs to grow L1 cache for %s', servicePool)
|
||||
servicesPools.append((servicePool, inCacheL1, inCacheL2, inAssigned))
|
||||
|
||||
# We also return calculated values so we can reuse then
|
||||
return servicesPools
|
||||
|
||||
def growL1Cache(self, sp, cacheL1, cacheL2, assigned):
|
||||
def growL1Cache(self, servicePool: ServicePool, cacheL1: int, cacheL2: int, assigned: int) -> None:
|
||||
"""
|
||||
This method tries to enlarge L1 cache.
|
||||
|
||||
@ -156,12 +155,12 @@ class ServiceCacheUpdater(Job):
|
||||
and PREPARING, assigned, L1 and L2) is over max allowed service deployments,
|
||||
this method will not grow the L1 cache
|
||||
"""
|
||||
logger.debug("Growing L1 cache creating a new service for {0}".format(sp))
|
||||
logger.debug('Growing L1 cache creating a new service for %s', servicePool.name)
|
||||
# First, we try to assign from L2 cache
|
||||
if cacheL2 > 0:
|
||||
valid = None
|
||||
with transaction.atomic():
|
||||
for n in sp.cachedUserServices().select_for_update().filter(UserServiceManager.getCacheStateFilter(services.UserDeployment.L2_CACHE)).order_by('creation_date'):
|
||||
for n in servicePool.cachedUserServices().select_for_update().filter(UserServiceManager.getCacheStateFilter(services.UserDeployment.L2_CACHE)).order_by('creation_date'):
|
||||
if n.needsOsManager():
|
||||
if State.isUsable(n.state) is False or State.isUsable(n.os_state):
|
||||
valid = n
|
||||
@ -174,14 +173,17 @@ class ServiceCacheUpdater(Job):
|
||||
valid.moveToLevel(services.UserDeployment.L1_CACHE)
|
||||
return
|
||||
try:
|
||||
UserServiceManager.manager().createCacheFor(sp.activePublication(), services.UserDeployment.L1_CACHE)
|
||||
except MaxServicesReachedError as e:
|
||||
log.doLog(sp, log.ERROR, 'Max number of services reached for this service', log.INTERNAL)
|
||||
logger.error(str(e))
|
||||
except:
|
||||
# This has a velid publication, or it will not be here
|
||||
UserServiceManager.manager().createCacheFor(
|
||||
typing.cast(DeployedServicePublication, servicePool.activePublication()), services.UserDeployment.L1_CACHE
|
||||
)
|
||||
except MaxServicesReachedError:
|
||||
log.doLog(servicePool, log.ERROR, 'Max number of services reached for this service', log.INTERNAL)
|
||||
logger.warning('Max user services reached for %s: %s. Cache not created', servicePool.name, servicePool.max_srvs)
|
||||
except Exception:
|
||||
logger.exception('Exception')
|
||||
|
||||
def growL2Cache(self, sp, cacheL1, cacheL2, assigned):
|
||||
def growL2Cache(self, servicePool: ServicePool, cacheL1: int, cacheL2: int, assigned: int) -> None:
|
||||
"""
|
||||
Tries to grow L2 cache of service.
|
||||
|
||||
@ -189,22 +191,28 @@ class ServiceCacheUpdater(Job):
|
||||
and PREPARING, assigned, L1 and L2) is over max allowed service deployments,
|
||||
this method will not grow the L1 cache
|
||||
"""
|
||||
logger.debug("Growing L2 cache creating a new service for {0}".format(sp))
|
||||
logger.debug("Growing L2 cache creating a new service for %s", servicePool.name)
|
||||
try:
|
||||
UserServiceManager.manager().createCacheFor(sp.activePublication(), services.UserDeployment.L2_CACHE)
|
||||
except MaxServicesReachedError as e:
|
||||
logger.error(str(e))
|
||||
# This has a velid publication, or it will not be here
|
||||
UserServiceManager.manager().createCacheFor(
|
||||
typing.cast(DeployedServicePublication, servicePool.activePublication()), services.UserDeployment.L2_CACHE
|
||||
)
|
||||
except MaxServicesReachedError:
|
||||
logger.warning('Max user services reached for %s: %s. Cache not created', servicePool.name, servicePool.max_srvs)
|
||||
# TODO: When alerts are ready, notify this
|
||||
|
||||
def reduceL1Cache(self, sp, cacheL1, cacheL2, assigned):
|
||||
logger.debug("Reducing L1 cache erasing a service in cache for {0}".format(sp))
|
||||
def reduceL1Cache(self, servicePool: ServicePool, cacheL1: int, cacheL2: int, assigned: int):
|
||||
logger.debug("Reducing L1 cache erasing a service in cache for %s", servicePool)
|
||||
# We will try to destroy the newest cacheL1 element that is USABLE if the deployer can't cancel a new service creation
|
||||
cacheItems = sp.cachedUserServices().filter(UserServiceManager.getCacheStateFilter(services.UserDeployment.L1_CACHE)).order_by('-creation_date')
|
||||
if len(cacheItems) == 0:
|
||||
logger.debug('There is more services than configured, but could not reduce cache cause its already empty')
|
||||
cacheItems: typing.List[UserService] = list(servicePool.cachedUserServices().filter(
|
||||
UserServiceManager.getCacheStateFilter(services.UserDeployment.L1_CACHE)
|
||||
).order_by('-creation_date').iterator())
|
||||
|
||||
if not cacheItems:
|
||||
logger.debug('There is more services than max configured, but could not reduce cache L1 cause its already empty')
|
||||
return
|
||||
|
||||
if cacheL2 < sp.cache_l2_srvs:
|
||||
if cacheL2 < servicePool.cache_l2_srvs:
|
||||
valid = None
|
||||
for n in cacheItems:
|
||||
if n.needsOsManager():
|
||||
@ -222,10 +230,12 @@ class ServiceCacheUpdater(Job):
|
||||
cache = cacheItems[0]
|
||||
cache.removeOrCancel()
|
||||
|
||||
def reduceL2Cache(self, sp, cacheL1, cacheL2, assigned):
|
||||
logger.debug("Reducing L2 cache erasing a service in cache for {0}".format(sp))
|
||||
def reduceL2Cache(self, servicePool: ServicePool, cacheL1: int, cacheL2: int, assigned: int):
|
||||
logger.debug("Reducing L2 cache erasing a service in cache for %s", servicePool.name)
|
||||
if cacheL2 > 0:
|
||||
cacheItems = sp.cachedUserServices().filter(UserServiceManager.getCacheStateFilter(services.UserDeployment.L2_CACHE)).order_by('creation_date')
|
||||
cacheItems: typing.List[UserService] = servicePool.cachedUserServices().filter(
|
||||
UserServiceManager.getCacheStateFilter(services.UserDeployment.L2_CACHE)
|
||||
).order_by('creation_date')
|
||||
# TODO: Look first for non finished cache items and cancel them
|
||||
cache = cacheItems[0]
|
||||
cache.removeOrCancel()
|
||||
@ -234,9 +244,9 @@ class ServiceCacheUpdater(Job):
|
||||
logger.debug('Starting cache checking')
|
||||
# We need to get
|
||||
servicesThatNeedsUpdate = self.servicesPoolsNeedingCacheUpdate()
|
||||
for sp, cacheL1, cacheL2, assigned in servicesThatNeedsUpdate:
|
||||
for servicePool, cacheL1, cacheL2, assigned in servicesThatNeedsUpdate:
|
||||
# We have cache to update??
|
||||
logger.debug("Updating cache for {0}".format(sp))
|
||||
logger.debug("Updating cache for %s", servicePool)
|
||||
totalL1Assigned = cacheL1 + assigned
|
||||
|
||||
# We try first to reduce cache before tring to increase it.
|
||||
@ -245,15 +255,15 @@ class ServiceCacheUpdater(Job):
|
||||
# This is so because service can have limited the number of services and,
|
||||
# if we try to increase cache before having reduced whatever needed
|
||||
# first, the service will get lock until someone removes something.
|
||||
if totalL1Assigned > sp.max_srvs:
|
||||
self.reduceL1Cache(sp, cacheL1, cacheL2, assigned)
|
||||
elif totalL1Assigned > sp.initial_srvs and cacheL1 > sp.cache_l1_srvs:
|
||||
self.reduceL1Cache(sp, cacheL1, cacheL2, assigned)
|
||||
elif cacheL2 > sp.cache_l2_srvs: # We have excesives L2 items
|
||||
self.reduceL2Cache(sp, cacheL1, cacheL2, assigned)
|
||||
elif totalL1Assigned < sp.max_srvs and (totalL1Assigned < sp.initial_srvs or cacheL1 < sp.cache_l1_srvs): # We need more services
|
||||
self.growL1Cache(sp, cacheL1, cacheL2, assigned)
|
||||
elif cacheL2 < sp.cache_l2_srvs: # We need more L2 items
|
||||
self.growL2Cache(sp, cacheL1, cacheL2, assigned)
|
||||
if totalL1Assigned > servicePool.max_srvs:
|
||||
self.reduceL1Cache(servicePool, cacheL1, cacheL2, assigned)
|
||||
elif totalL1Assigned > servicePool.initial_srvs and cacheL1 > servicePool.cache_l1_srvs:
|
||||
self.reduceL1Cache(servicePool, cacheL1, cacheL2, assigned)
|
||||
elif cacheL2 > servicePool.cache_l2_srvs: # We have excesives L2 items
|
||||
self.reduceL2Cache(servicePool, cacheL1, cacheL2, assigned)
|
||||
elif totalL1Assigned < servicePool.max_srvs and (totalL1Assigned < servicePool.initial_srvs or cacheL1 < servicePool.cache_l1_srvs): # We need more services
|
||||
self.growL1Cache(servicePool, cacheL1, cacheL2, assigned)
|
||||
elif cacheL2 < servicePool.cache_l2_srvs: # We need more L2 items
|
||||
self.growL2Cache(servicePool, cacheL1, cacheL2, assigned)
|
||||
else:
|
||||
logger.info("We have more services than max requested for {0}, but can't erase any of then cause all of them are already assigned".format(sp))
|
||||
logger.warning("We have more services than max requested for %s", servicePool.name)
|
||||
|
@ -1,6 +1,6 @@
|
||||
# -*- coding: utf-8 -*-
|
||||
#
|
||||
# Copyright (c) 2013 Virtual Cable S.L.
|
||||
# Copyright (c) 2013-2019 Virtual Cable S.L.
|
||||
# All rights reserved.
|
||||
#
|
||||
# Redistribution and use in source and binary forms, with or without modification,
|
||||
@ -29,15 +29,15 @@
|
||||
"""
|
||||
@author: Adolfo Gómez, dkmaster at dkmon dot com
|
||||
"""
|
||||
from __future__ import unicode_literals
|
||||
import logging
|
||||
import typing
|
||||
|
||||
from uds.models import DeployedService
|
||||
from uds.models import ServicePool
|
||||
from uds.core.util.State import State
|
||||
from uds.core.util.stats import counters
|
||||
from uds.core.managers import statsManager
|
||||
from uds.core.jobs.Job import Job
|
||||
|
||||
import logging
|
||||
|
||||
logger = logging.getLogger(__name__)
|
||||
|
||||
@ -46,25 +46,22 @@ class DeployedServiceStatsCollector(Job):
|
||||
"""
|
||||
This Job is responsible for collecting stats for every deployed service every ten minutes
|
||||
"""
|
||||
|
||||
frecuency = 599 # Once every ten minutes, 601 is prime, 599 also is prime
|
||||
friendly_name = 'Deployed Service Stats'
|
||||
|
||||
def __init__(self, environment):
|
||||
super(DeployedServiceStatsCollector, self).__init__(environment)
|
||||
|
||||
def run(self):
|
||||
logger.debug('Starting Deployed service stats collector')
|
||||
|
||||
for ds in DeployedService.objects.filter(state=State.ACTIVE):
|
||||
servicePoolsToCheck: typing.Iterable[ServicePool] = ServicePool.objects.filter(state=State.ACTIVE).iterator()
|
||||
for servicePool in servicePoolsToCheck:
|
||||
try:
|
||||
fltr = ds.assignedUserServices().exclude(state__in=State.INFO_STATES)
|
||||
fltr = servicePool.assignedUserServices().exclude(state__in=State.INFO_STATES)
|
||||
assigned = fltr.count()
|
||||
inUse = fltr.filter(in_use=True).count()
|
||||
counters.addCounter(ds, counters.CT_ASSIGNED, assigned)
|
||||
counters.addCounter(ds, counters.CT_INUSE, inUse)
|
||||
counters.addCounter(servicePool, counters.CT_ASSIGNED, assigned)
|
||||
counters.addCounter(servicePool, counters.CT_INUSE, inUse)
|
||||
except Exception:
|
||||
logger.exception('Getting counters for deployed service {0}'.format(ds))
|
||||
logger.exception('Getting counters for service pool %s', servicePool.name)
|
||||
|
||||
logger.debug('Done Deployed service stats collector')
|
||||
|
||||
@ -84,12 +81,12 @@ class StatsCleaner(Job):
|
||||
logger.debug('Starting statistics cleanup')
|
||||
try:
|
||||
statsManager().cleanupCounters()
|
||||
except:
|
||||
except Exception:
|
||||
logger.exception('Cleaning up counters')
|
||||
|
||||
try:
|
||||
statsManager().cleanupEvents()
|
||||
except:
|
||||
except Exception:
|
||||
logger.exception('Cleaning up events')
|
||||
|
||||
logger.debug('Done statistics cleanup')
|
||||
|
@ -1,7 +1,7 @@
|
||||
# -*- coding: utf-8 -*-
|
||||
|
||||
#
|
||||
# Copyright (c) 2012 Virtual Cable S.L.
|
||||
# Copyright (c) 2012-2019 Virtual Cable S.L.
|
||||
# All rights reserved.
|
||||
#
|
||||
# Redistribution and use in source and binary forms, with or without modification,
|
||||
@ -30,14 +30,14 @@
|
||||
"""
|
||||
@author: Adolfo Gómez, dkmaster at dkmon dot com
|
||||
"""
|
||||
from __future__ import unicode_literals
|
||||
from datetime import datetime, timedelta
|
||||
import logging
|
||||
import typing
|
||||
|
||||
from uds.models import DeployedService, getSqlDatetime
|
||||
from uds.models import ServicePool, UserService, getSqlDatetime
|
||||
from uds.core.util.State import State
|
||||
from uds.core.jobs.Job import Job
|
||||
from uds.core.util import log
|
||||
from datetime import timedelta
|
||||
import logging
|
||||
|
||||
logger = logging.getLogger(__name__)
|
||||
|
||||
@ -52,16 +52,19 @@ class StuckCleaner(Job):
|
||||
frecuency = 3600 * 24 # Executes Once a day
|
||||
friendly_name = 'Stuck States cleaner'
|
||||
|
||||
def __init__(self, environment):
|
||||
super(StuckCleaner, self).__init__(environment)
|
||||
|
||||
def run(self):
|
||||
since_state = getSqlDatetime() - timedelta(seconds=MAX_STUCK_TIME)
|
||||
since_state: datetime = getSqlDatetime() - timedelta(seconds=MAX_STUCK_TIME)
|
||||
# Filter for locating machine not ready
|
||||
for ds in DeployedService.objects.filter(service__provider__maintenance_mode=False):
|
||||
logger.debug('Searching for stuck states for {0}'.format(ds))
|
||||
servicePoolsActive: typing.Iterable[ServicePool] = ServicePool.objects.filter(service__provider__maintenance_mode=False).iterator()
|
||||
for servicePool in servicePoolsActive:
|
||||
logger.debug('Searching for stuck states for %s', servicePool.name)
|
||||
stuckUserServices: typing.Iterable[UserService] = servicePool.userServices.filter(
|
||||
state_date__lt=since_state
|
||||
).exclude(
|
||||
state__in=State.INFO_STATES + State.VALID_STATES
|
||||
).iterator()
|
||||
# Info states are removed on UserServiceCleaner and VALID_STATES are ok, or if "hanged", checked on "HangedCleaner"
|
||||
for us in ds.userServices.filter(state_date__lt=since_state).exclude(state__in=State.INFO_STATES + State.VALID_STATES):
|
||||
logger.debug('Found stuck user service {0}'.format(us))
|
||||
log.doLog(ds, log.ERROR, 'User service {0} has been hard removed because it\'s stuck'.format(us.friendly_name))
|
||||
us.delete()
|
||||
for stuck in stuckUserServices:
|
||||
logger.debug('Found stuck user service %s', stuck)
|
||||
log.doLog(servicePool, log.ERROR, 'User service %s has been hard removed because it\'s stuck', stuck.name)
|
||||
stuck.delete()
|
||||
|
@ -1,7 +1,7 @@
|
||||
# -*- coding: utf-8 -*-
|
||||
|
||||
#
|
||||
# Copyright (c) 2012 Virtual Cable S.L.
|
||||
# Copyright (c) 2012-2019 Virtual Cable S.L.
|
||||
# All rights reserved.
|
||||
#
|
||||
# Redistribution and use in source and binary forms, with or without modification,
|
||||
@ -30,15 +30,13 @@
|
||||
"""
|
||||
@author: Adolfo Gómez, dkmaster at dkmon dot com
|
||||
"""
|
||||
from __future__ import unicode_literals
|
||||
from importlib import import_module
|
||||
import logging
|
||||
|
||||
from django.conf import settings
|
||||
from uds.core.util.Cache import Cache
|
||||
from uds.core.jobs.Job import Job
|
||||
from uds.models import TicketStore
|
||||
from django.conf import settings
|
||||
from importlib import import_module
|
||||
|
||||
import logging
|
||||
|
||||
logger = logging.getLogger(__name__)
|
||||
|
||||
@ -48,9 +46,6 @@ class CacheCleaner(Job):
|
||||
frecuency = 3600 * 24 # Once a day
|
||||
friendly_name = 'Utility Cache Cleaner'
|
||||
|
||||
def __init__(self, environment):
|
||||
super(CacheCleaner, self).__init__(environment)
|
||||
|
||||
def run(self):
|
||||
logger.debug('Starting cache cleanup')
|
||||
Cache.cleanUp()
|
||||
@ -62,9 +57,6 @@ class TicketStoreCleaner(Job):
|
||||
frecuency = 60 # every minute (60 seconds)
|
||||
friendly_name = 'Ticket Storage Cleaner'
|
||||
|
||||
def __init__(self, environment):
|
||||
super(TicketStoreCleaner, self).__init__(environment)
|
||||
|
||||
def run(self):
|
||||
logger.debug('Starting ticket storage cleanup')
|
||||
TicketStore.cleanup()
|
||||
@ -76,9 +68,6 @@ class SessionsCleaner(Job):
|
||||
frecuency = 3600 * 24 * 7 # Once a day will be enough
|
||||
friendly_name = 'User Sessions cleaner'
|
||||
|
||||
def __init__(self, environment):
|
||||
super(SessionsCleaner, self).__init__(environment)
|
||||
|
||||
def run(self):
|
||||
logger.debug('Starting session cleanup')
|
||||
try:
|
||||
|
@ -1,7 +1,7 @@
|
||||
# -*- coding: utf-8 -*-
|
||||
|
||||
#
|
||||
# Copyright (c) 2012 Virtual Cable S.L.
|
||||
# Copyright (c) 2012-2019 Virtual Cable S.L.
|
||||
# All rights reserved.
|
||||
#
|
||||
# Redistribution and use in source and binary forms, with or without modification,
|
||||
@ -30,13 +30,12 @@
|
||||
"""
|
||||
@author: Adolfo Gómez, dkmaster at dkmon dot com
|
||||
"""
|
||||
from __future__ import unicode_literals
|
||||
import logging
|
||||
|
||||
from django.db import transaction
|
||||
|
||||
from uds.models import AccountUsage, getSqlDatetime
|
||||
from uds.core.jobs.Job import Job
|
||||
import logging
|
||||
|
||||
logger = logging.getLogger(__name__)
|
||||
|
||||
@ -45,9 +44,6 @@ class UsageAccounting(Job):
|
||||
frecuency = 60
|
||||
friendly_name = 'Usage Accounting update'
|
||||
|
||||
def __init__(self, environment):
|
||||
super(UsageAccounting, self).__init__(environment)
|
||||
|
||||
def run(self):
|
||||
with transaction.atomic():
|
||||
AccountUsage.objects.select_for_update().filter(user_service__in_use=True).update(end=getSqlDatetime())
|
||||
|
@ -1,7 +1,7 @@
|
||||
# -*- coding: utf-8 -*-
|
||||
|
||||
#
|
||||
# Copyright (c) 2012 Virtual Cable S.L.
|
||||
# Copyright (c) 2012-2019 Virtual Cable S.L.
|
||||
# All rights reserved.
|
||||
#
|
||||
# Redistribution and use in source and binary forms, with or without modification,
|
||||
@ -30,7 +30,9 @@
|
||||
"""
|
||||
@author: Adolfo Gómez, dkmaster at dkmon dot com
|
||||
"""
|
||||
from __future__ import unicode_literals
|
||||
from datetime import timedelta
|
||||
import logging
|
||||
import typing
|
||||
|
||||
from django.db import transaction
|
||||
from uds.core import managers
|
||||
@ -38,8 +40,6 @@ from uds.core.util.Config import GlobalConfig
|
||||
from uds.models import UserService, getSqlDatetime
|
||||
from uds.core.util.State import State
|
||||
from uds.core.jobs.Job import Job
|
||||
from datetime import timedelta
|
||||
import logging
|
||||
|
||||
logger = logging.getLogger(__name__)
|
||||
|
||||
@ -54,12 +54,9 @@ class UserServiceInfoItemsCleaner(Job):
|
||||
frecuency_cfg = GlobalConfig.KEEP_INFO_TIME # Request run cache "info" cleaner every configured seconds. If config value is changed, it will be used at next reload
|
||||
friendly_name = 'User Service Info Cleaner'
|
||||
|
||||
def __init__(self, environment):
|
||||
super(UserServiceInfoItemsCleaner, self).__init__(environment)
|
||||
|
||||
def run(self):
|
||||
removeFrom = getSqlDatetime() - timedelta(seconds=GlobalConfig.KEEP_INFO_TIME.getInt(True))
|
||||
logger.debug('Removing information user services from {0}'.format(removeFrom))
|
||||
logger.debug('Removing information user services from %s', removeFrom)
|
||||
with transaction.atomic():
|
||||
UserService.objects.select_for_update().filter(state__in=State.INFO_STATES, state_date__lt=removeFrom).delete()
|
||||
|
||||
@ -69,20 +66,23 @@ class UserServiceRemover(Job):
|
||||
frecuency_cfg = GlobalConfig.REMOVAL_CHECK # Request run cache "info" cleaner every configued seconds. If config value is changed, it will be used at next reload
|
||||
friendly_name = 'User Service Cleaner'
|
||||
|
||||
removeAtOnce = GlobalConfig.USER_SERVICE_CLEAN_NUMBER.getInt() # Same, it will work at reload
|
||||
|
||||
def __init__(self, environment):
|
||||
super(UserServiceRemover, self).__init__(environment)
|
||||
|
||||
def run(self):
|
||||
removeAtOnce: int = GlobalConfig.USER_SERVICE_CLEAN_NUMBER.getInt() # Same, it will work at reload
|
||||
|
||||
with transaction.atomic():
|
||||
removeFrom = getSqlDatetime() - timedelta(seconds=10) # We keep at least 10 seconds the machine before removing it, so we avoid connections errors
|
||||
removables = UserService.objects.filter(state=State.REMOVABLE, state_date__lt=removeFrom,
|
||||
deployed_service__service__provider__maintenance_mode=False)[0:UserServiceRemover.removeAtOnce]
|
||||
for us in removables:
|
||||
logger.debug('Checking removal of {}'.format(us))
|
||||
removableUserServices: typing.Iterable[UserService] = UserService.objects.filter(
|
||||
state=State.REMOVABLE,
|
||||
state_date__lt=removeFrom,
|
||||
deployed_service__service__provider__maintenance_mode=False
|
||||
)[0:removeAtOnce].iterator()
|
||||
|
||||
manager = managers.userServiceManager()
|
||||
for removableUserService in removableUserServices:
|
||||
logger.debug('Checking removal of %s', removableUserService.name)
|
||||
try:
|
||||
if managers.userServiceManager().canRemoveServiceFromDeployedService(us.deployed_service) is True:
|
||||
managers.userServiceManager().remove(us)
|
||||
if manager.canRemoveServiceFromDeployedService(removableUserService.deployed_service) is True:
|
||||
manager.remove(removableUserService)
|
||||
except Exception:
|
||||
logger.exception('Exception removing user service')
|
||||
|
@ -1,7 +1,7 @@
|
||||
# -*- coding: utf-8 -*-
|
||||
|
||||
#
|
||||
# Copyright (c) 2012 Virtual Cable S.L.
|
||||
# Copyright (c) 2012-2019 Virtual Cable S.L.
|
||||
# All rights reserved.
|
||||
#
|
||||
# Redistribution and use in source and binary forms, with or without modification,
|
||||
@ -30,8 +30,6 @@
|
||||
"""
|
||||
.. moduleauthor:: Adolfo Gómez, dkmaster at dkmon dot com
|
||||
"""
|
||||
from __future__ import unicode_literals
|
||||
|
||||
import logging
|
||||
|
||||
logger = logging.getLogger(__name__)
|
||||
@ -51,14 +49,14 @@ def initialize():
|
||||
# Dinamycally import children of this package.
|
||||
pkgpath = os.path.dirname(sys.modules[__name__].__file__)
|
||||
for _, name, _ in pkgutil.iter_modules([pkgpath]):
|
||||
logger.debug('Importing {}'.format(name))
|
||||
logger.debug('Importing %s', name)
|
||||
__import__(name, globals(), locals(), [], 1)
|
||||
|
||||
p = jobs.Job
|
||||
# This is marked as error in IDE, but it's not (__subclasses__)
|
||||
for cls in p.__subclasses__():
|
||||
logger.debug('Examining worker {}'.format(cls.__module__))
|
||||
logger.debug('Examining worker %s', cls.__module__)
|
||||
# Limit to autoregister just workers jobs inside this module
|
||||
if cls.__module__[0:16] == 'uds.core.workers':
|
||||
logger.debug('Added worker {} to list'.format(cls.__module__))
|
||||
logger.debug('Added worker %s to list', cls.__module__)
|
||||
TaskManager.registerJob(cls)
|
||||
|
@ -48,7 +48,6 @@ class Storage(models.Model):
|
||||
attr1 = models.CharField(max_length=64, db_index=True, null=True, blank=True, default=None)
|
||||
|
||||
# Removed old locking manager, that blocked tables
|
||||
# TODO: review that all is consistents (it should)
|
||||
|
||||
class Meta:
|
||||
"""
|
||||
@ -58,4 +57,3 @@ class Storage(models.Model):
|
||||
|
||||
def __str__(self):
|
||||
return '{} {} = {}, {}'.format(self.owner, self.key, self.data, '/'.join([self.attr1]))
|
||||
|
||||
|
Loading…
Reference in New Issue
Block a user