mirror of
https://github.com/dkmstr/openuds.git
synced 2024-12-22 13:34:04 +03:00
Enough linting for today... :)
This commit is contained in:
parent
8adc3ca40d
commit
fa8e77c750
@ -29,6 +29,8 @@
|
|||||||
"""
|
"""
|
||||||
@author: Adolfo Gómez, dkmaster at dkmon dot com
|
@author: Adolfo Gómez, dkmaster at dkmon dot com
|
||||||
"""
|
"""
|
||||||
|
|
||||||
|
# pylint: disable=unused-import
|
||||||
from .unique_gid_generator import UniqueGIDGenerator
|
from .unique_gid_generator import UniqueGIDGenerator
|
||||||
from .unique_mac_generator import UniqueMacGenerator
|
from .unique_mac_generator import UniqueMacGenerator
|
||||||
from .unique_name_generator import UniqueNameGenerator
|
from .unique_name_generator import UniqueNameGenerator
|
||||||
|
@ -35,31 +35,31 @@ from collections import defaultdict
|
|||||||
import defusedxml.ElementTree as ET
|
import defusedxml.ElementTree as ET
|
||||||
|
|
||||||
if typing.TYPE_CHECKING:
|
if typing.TYPE_CHECKING:
|
||||||
from xml.etree.cElementTree import Element # nosec: Only type checking
|
from xml.etree.ElementTree import Element # nosec: Only type checking
|
||||||
|
|
||||||
|
|
||||||
def etree_to_dict(t: 'Element') -> typing.Mapping[str, typing.Any]:
|
def etree_to_dict(tree: 'Element') -> typing.Mapping[str, typing.Any]:
|
||||||
d: typing.MutableMapping[str, typing.Any] = {}
|
dct: typing.MutableMapping[str, typing.Any] = {}
|
||||||
if t.attrib:
|
if tree.attrib:
|
||||||
d.update({t.tag: {}})
|
dct.update({tree.tag: {}})
|
||||||
|
|
||||||
children = list(t)
|
children = list(tree)
|
||||||
if children:
|
if children:
|
||||||
dd = defaultdict(list)
|
dd = defaultdict(list)
|
||||||
for dc in map(etree_to_dict, children):
|
for dc in map(etree_to_dict, children):
|
||||||
for k, v in dc.items():
|
for k, v in dc.items():
|
||||||
dd[k].append(v)
|
dd[k].append(v)
|
||||||
d = {t.tag: {k: v[0] if len(v) == 1 else v for k, v in dd.items()}}
|
dct = {tree.tag: {k: v[0] if len(v) == 1 else v for k, v in dd.items()}}
|
||||||
if t.attrib:
|
if tree.attrib:
|
||||||
d[t.tag].update(('@' + k, v) for k, v in t.attrib.items())
|
dct[tree.tag].update(('@' + k, v) for k, v in tree.attrib.items())
|
||||||
if t.text:
|
if tree.text:
|
||||||
text = t.text.strip()
|
text = tree.text.strip()
|
||||||
if children or t.attrib:
|
if children or tree.attrib:
|
||||||
if text:
|
if text:
|
||||||
d[t.tag]['#text'] = text
|
dct[tree.tag]['#text'] = text
|
||||||
else:
|
else:
|
||||||
d[t.tag] = text
|
dct[tree.tag] = text
|
||||||
return d
|
return dct
|
||||||
|
|
||||||
|
|
||||||
def parse(xml_string: str) -> typing.Mapping[str, typing.Any]:
|
def parse(xml_string: str) -> typing.Mapping[str, typing.Any]:
|
||||||
|
@ -43,8 +43,8 @@ def initialize() -> None:
|
|||||||
This imports all packages that are descendant of this package, and, after that,
|
This imports all packages that are descendant of this package, and, after that,
|
||||||
it register all subclases of service provider as
|
it register all subclases of service provider as
|
||||||
"""
|
"""
|
||||||
from uds.core import jobs
|
from uds.core import jobs # pylint: disable=import-outside-toplevel
|
||||||
from uds.core.managers import taskManager
|
from uds.core.managers import taskManager # pylint: disable=import-outside-toplevel
|
||||||
|
|
||||||
def registerer(cls: typing.Type[jobs.Job]) -> None:
|
def registerer(cls: typing.Type[jobs.Job]) -> None:
|
||||||
if cls.__module__.startswith('uds.core.workers'):
|
if cls.__module__.startswith('uds.core.workers'):
|
||||||
|
@ -105,9 +105,7 @@ class HangedCleaner(Job):
|
|||||||
log.doLog(
|
log.doLog(
|
||||||
servicePool,
|
servicePool,
|
||||||
log.ERROR,
|
log.ERROR,
|
||||||
'User service {} hanged on removal. Restarting removal.'.format(
|
f'User service {us.friendly_name} hanged on removal. Restarting removal.',
|
||||||
us.friendly_name
|
|
||||||
),
|
|
||||||
)
|
)
|
||||||
us.release() # Mark it again as removable, and let's see
|
us.release() # Mark it again as removable, and let's see
|
||||||
else:
|
else:
|
||||||
@ -120,8 +118,6 @@ class HangedCleaner(Job):
|
|||||||
log.doLog(
|
log.doLog(
|
||||||
servicePool,
|
servicePool,
|
||||||
log.ERROR,
|
log.ERROR,
|
||||||
'Removing user service {} because it seems to be hanged'.format(
|
f'Removing user service {us.friendly_name} because it seems to be hanged'
|
||||||
us.friendly_name
|
|
||||||
),
|
|
||||||
)
|
)
|
||||||
us.releaseOrCancel()
|
us.releaseOrCancel()
|
||||||
|
@ -32,16 +32,14 @@
|
|||||||
import logging
|
import logging
|
||||||
|
|
||||||
from uds.core.jobs import Job
|
from uds.core.jobs import Job
|
||||||
from uds.core.util.config import GlobalConfig
|
# from uds.core.util.config import GlobalConfig
|
||||||
from uds.core.util.state import State
|
|
||||||
from uds.models import ServicePool, getSqlDatetime
|
|
||||||
|
|
||||||
logger = logging.getLogger(__name__)
|
logger = logging.getLogger(__name__)
|
||||||
|
|
||||||
|
|
||||||
class Notifications(Job):
|
class Notifications(Job):
|
||||||
frecuency = 60 # Once every minute
|
frecuency = 60 # Once every minute
|
||||||
frecuency_cfg = GlobalConfig.CHECK_UNUSED_DELAY
|
# frecuency_cfg = GlobalConfig.XXXX
|
||||||
friendly_name = 'Notifications worker'
|
friendly_name = 'Notifications worker'
|
||||||
|
|
||||||
def run(self) -> None:
|
def run(self) -> None:
|
||||||
|
@ -57,14 +57,13 @@ class SchedulerHousekeeping(Job):
|
|||||||
Look for "hanged" scheduler tasks and reschedule them
|
Look for "hanged" scheduler tasks and reschedule them
|
||||||
"""
|
"""
|
||||||
since = getSqlDatetime() - timedelta(minutes=MAX_EXECUTION_MINUTES)
|
since = getSqlDatetime() - timedelta(minutes=MAX_EXECUTION_MINUTES)
|
||||||
for i in range(3): # Retry three times in case of lockout error
|
for _ in range(3): # Retry three times in case of lockout error
|
||||||
try:
|
try:
|
||||||
with transaction.atomic():
|
with transaction.atomic():
|
||||||
Scheduler.objects.select_for_update(skip_locked=True).filter(
|
Scheduler.objects.select_for_update(skip_locked=True).filter(
|
||||||
last_execution__lt=since, state=State.RUNNING
|
last_execution__lt=since, state=State.RUNNING
|
||||||
).update(owner_server='', state=State.FOR_EXECUTE)
|
).update(owner_server='', state=State.FOR_EXECUTE)
|
||||||
break
|
break
|
||||||
except Exception as e:
|
except Exception:
|
||||||
logger.info('Retrying Scheduler cleanup transaction')
|
logger.info('Retrying Scheduler cleanup transaction')
|
||||||
time.sleep(1)
|
time.sleep(1)
|
||||||
|
|
||||||
|
@ -209,7 +209,11 @@ class ServiceCacheUpdater(Job):
|
|||||||
return servicesPools
|
return servicesPools
|
||||||
|
|
||||||
def growL1Cache(
|
def growL1Cache(
|
||||||
self, servicePool: ServicePool, cacheL1: int, cacheL2: int, assigned: int
|
self,
|
||||||
|
servicePool: ServicePool,
|
||||||
|
cacheL1: int, # pylint: disable=unused-argument
|
||||||
|
cacheL2: int,
|
||||||
|
assigned: int, # pylint: disable=unused-argument
|
||||||
) -> None:
|
) -> None:
|
||||||
"""
|
"""
|
||||||
This method tries to enlarge L1 cache.
|
This method tries to enlarge L1 cache.
|
||||||
@ -268,7 +272,11 @@ class ServiceCacheUpdater(Job):
|
|||||||
logger.exception('Exception')
|
logger.exception('Exception')
|
||||||
|
|
||||||
def growL2Cache(
|
def growL2Cache(
|
||||||
self, servicePool: ServicePool, cacheL1: int, cacheL2: int, assigned: int
|
self,
|
||||||
|
servicePool: ServicePool,
|
||||||
|
cacheL1: int, # pylint: disable=unused-argument
|
||||||
|
cacheL2: int, # pylint: disable=unused-argument
|
||||||
|
assigned: int, # pylint: disable=unused-argument
|
||||||
) -> None:
|
) -> None:
|
||||||
"""
|
"""
|
||||||
Tries to grow L2 cache of service.
|
Tries to grow L2 cache of service.
|
||||||
@ -293,7 +301,11 @@ class ServiceCacheUpdater(Job):
|
|||||||
# TODO: When alerts are ready, notify this
|
# TODO: When alerts are ready, notify this
|
||||||
|
|
||||||
def reduceL1Cache(
|
def reduceL1Cache(
|
||||||
self, servicePool: ServicePool, cacheL1: int, cacheL2: int, assigned: int
|
self,
|
||||||
|
servicePool: ServicePool,
|
||||||
|
cacheL1: int, # pylint: disable=unused-argument
|
||||||
|
cacheL2: int,
|
||||||
|
assigned: int, # pylint: disable=unused-argument
|
||||||
):
|
):
|
||||||
logger.debug("Reducing L1 cache erasing a service in cache for %s", servicePool)
|
logger.debug("Reducing L1 cache erasing a service in cache for %s", servicePool)
|
||||||
# We will try to destroy the newest cacheL1 element that is USABLE if the deployer can't cancel a new service creation
|
# We will try to destroy the newest cacheL1 element that is USABLE if the deployer can't cancel a new service creation
|
||||||
@ -334,7 +346,11 @@ class ServiceCacheUpdater(Job):
|
|||||||
cache.removeOrCancel()
|
cache.removeOrCancel()
|
||||||
|
|
||||||
def reduceL2Cache(
|
def reduceL2Cache(
|
||||||
self, servicePool: ServicePool, cacheL1: int, cacheL2: int, assigned: int
|
self,
|
||||||
|
servicePool: ServicePool,
|
||||||
|
cacheL1: int, # pylint: disable=unused-argument
|
||||||
|
cacheL2: int,
|
||||||
|
assigned: int, # pylint: disable=unused-argument
|
||||||
):
|
):
|
||||||
logger.debug(
|
logger.debug(
|
||||||
"Reducing L2 cache erasing a service in cache for %s", servicePool.name
|
"Reducing L2 cache erasing a service in cache for %s", servicePool.name
|
||||||
@ -350,7 +366,7 @@ class ServiceCacheUpdater(Job):
|
|||||||
.order_by('creation_date')
|
.order_by('creation_date')
|
||||||
)
|
)
|
||||||
# TODO: Look first for non finished cache items and cancel them?
|
# TODO: Look first for non finished cache items and cancel them?
|
||||||
cache: UserService = cacheItems[0] # type: ignore # Slicing is not supported by pylance right now
|
cache: UserService = cacheItems[0]
|
||||||
cache.removeOrCancel()
|
cache.removeOrCancel()
|
||||||
|
|
||||||
def run(self) -> None:
|
def run(self) -> None:
|
||||||
|
@ -32,8 +32,6 @@
|
|||||||
import logging
|
import logging
|
||||||
import typing
|
import typing
|
||||||
|
|
||||||
from django.utils.translation import gettext_lazy as _
|
|
||||||
|
|
||||||
from uds import models
|
from uds import models
|
||||||
from uds.core.util.state import State
|
from uds.core.util.state import State
|
||||||
from uds.core.util.stats import counters
|
from uds.core.util.stats import counters
|
||||||
@ -172,7 +170,7 @@ class StatsAccumulator(Job):
|
|||||||
def run(self):
|
def run(self):
|
||||||
try:
|
try:
|
||||||
StatsManager.manager().acummulate(config.GlobalConfig.STATS_ACCUM_MAX_CHUNK_TIME.getInt())
|
StatsManager.manager().acummulate(config.GlobalConfig.STATS_ACCUM_MAX_CHUNK_TIME.getInt())
|
||||||
except Exception as e:
|
except Exception:
|
||||||
logger.exception('Compressing counters')
|
logger.exception('Compressing counters')
|
||||||
|
|
||||||
logger.debug('Done statistics compression')
|
logger.debug('Done statistics compression')
|
||||||
|
@ -84,7 +84,7 @@ class StuckCleaner(Job):
|
|||||||
# Info states are removed on UserServiceCleaner and VALID_STATES are ok, or if "hanged", checked on "HangedCleaner"
|
# Info states are removed on UserServiceCleaner and VALID_STATES are ok, or if "hanged", checked on "HangedCleaner"
|
||||||
def stuckUserServices(servicePool: ServicePool) -> typing.Iterable[UserService]:
|
def stuckUserServices(servicePool: ServicePool) -> typing.Iterable[UserService]:
|
||||||
q = servicePool.userServices.filter(state_date__lt=since_state)
|
q = servicePool.userServices.filter(state_date__lt=since_state)
|
||||||
# Get all that are not in valid or info states, AND the ones that are "PREPARING" with
|
# Get all that are not in valid or info states, AND the ones that are "PREPARING" with
|
||||||
# "destroy_after" property set (exists) (that means that are waiting to be destroyed after initializations)
|
# "destroy_after" property set (exists) (that means that are waiting to be destroyed after initializations)
|
||||||
yield from q.exclude(state__in=State.INFO_STATES + State.VALID_STATES)
|
yield from q.exclude(state__in=State.INFO_STATES + State.VALID_STATES)
|
||||||
yield from q.filter(state=State.PREPARING, properties__name='destroy_after')
|
yield from q.filter(state=State.PREPARING, properties__name='destroy_after')
|
||||||
@ -96,9 +96,7 @@ class StuckCleaner(Job):
|
|||||||
log.doLog(
|
log.doLog(
|
||||||
servicePool,
|
servicePool,
|
||||||
log.ERROR,
|
log.ERROR,
|
||||||
'User service {} has been hard removed because it\'s stuck'.format(
|
f'User service {stuck.name} has been hard removed because it\'s stuck',
|
||||||
stuck.name
|
|
||||||
),
|
|
||||||
)
|
)
|
||||||
# stuck.setState(State.ERROR)
|
# stuck.setState(State.ERROR)
|
||||||
stuck.delete()
|
stuck.delete()
|
||||||
|
@ -78,7 +78,7 @@ class UserServiceRemover(Job):
|
|||||||
# This configuration value is cached at startup, so it is not updated until next reload
|
# This configuration value is cached at startup, so it is not updated until next reload
|
||||||
removeAtOnce: int = GlobalConfig.USER_SERVICE_CLEAN_NUMBER.getInt()
|
removeAtOnce: int = GlobalConfig.USER_SERVICE_CLEAN_NUMBER.getInt()
|
||||||
manager = managers.userServiceManager()
|
manager = managers.userServiceManager()
|
||||||
|
|
||||||
with transaction.atomic():
|
with transaction.atomic():
|
||||||
removeFrom = getSqlDatetime() - timedelta(
|
removeFrom = getSqlDatetime() - timedelta(
|
||||||
seconds=10
|
seconds=10
|
||||||
@ -89,7 +89,9 @@ class UserServiceRemover(Job):
|
|||||||
state=State.REMOVABLE,
|
state=State.REMOVABLE,
|
||||||
state_date__lt=removeFrom,
|
state_date__lt=removeFrom,
|
||||||
deployed_service__service__provider__maintenance_mode=False,
|
deployed_service__service__provider__maintenance_mode=False,
|
||||||
).iterator(chunk_size=removeAtOnce)
|
).iterator(
|
||||||
|
chunk_size=removeAtOnce
|
||||||
|
)
|
||||||
|
|
||||||
# We remove at once, but we limit the number of items to remove
|
# We remove at once, but we limit the number of items to remove
|
||||||
|
|
||||||
|
Loading…
Reference in New Issue
Block a user