1
0
mirror of https://github.com/dkmstr/openuds.git synced 2025-02-02 09:47:13 +03:00

Adding stats counter acummulators

This commit is contained in:
Adolfo Gómez García 2022-11-09 21:48:38 +01:00
parent 40364cdcce
commit 1cf2e2cd76
No known key found for this signature in database
GPG Key ID: DD1ABF20724CDA23
6 changed files with 374 additions and 38 deletions

View File

@ -36,7 +36,7 @@ import typing
from uds.core.util.config import GlobalConfig
from uds.core.util import singleton
from uds.models import StatsCounters
from uds.models import StatsCounters, StatsCountersAccum
from uds.models import getSqlDatetime, getSqlDatetimeAsUnix
from uds.models import StatsEvents
@ -73,7 +73,7 @@ class StatsManager(metaclass=singleton.Singleton):
def manager() -> 'StatsManager':
return StatsManager() # Singleton pattern will return always the same instance
def __doCleanup(self, model):
def __doCleanup(self, model: typing.Type[typing.Union['StatsCounters', 'StatsEvents']]) -> None:
minTime = time.mktime(
(
getSqlDatetime()
@ -284,3 +284,7 @@ class StatsManager(metaclass=singleton.Singleton):
"""
self.__doCleanup(StatsEvents)
def acummulate(self, max_days: int = 7):
for interval in StatsCountersAccum.IntervalType:
StatsCountersAccum.acummulate(interval, max_days)

View File

@ -32,31 +32,47 @@
import logging
import typing
from uds.models import ServicePool, Authenticator, getSqlDatetime
from django.utils.translation import gettext_lazy as _
from uds import models
from uds.core.util.state import State
from uds.core.util.stats import counters
from uds.core.managers.stats import StatsManager
from uds.core.jobs import Job
from uds.core.util import config
logger = logging.getLogger(__name__)
# Early declaration of config variable
STATS_ACCUM_FREQUENCY = config.Config.section(config.ADMIN_SECTION).value(
'Stats Accumulation Frequency',
'14400',
type=config.Config.NUMERIC_FIELD,
help=_('Frequency of stats collection in seconds. Default is 4 hours (14400 seconds)'),
)
STATS_ACCUM_MAX_CHUNK_TIME = config.Config.section(config.ADMIN_SECTION).value(
'Stats Accumulation Chunk',
'7',
type=config.Config.NUMERIC_FIELD,
help=_('Maximum number of time to accumulate on one run. Default is 7 (1 week)'),
)
class DeployedServiceStatsCollector(Job):
"""
This Job is responsible for collecting stats for every deployed service every ten minutes
"""
frecuency = 599 # Once every ten minutes, 601 is prime, 599 also is prime, i like primes... :)
frecuency = 599 # Once every ten minutes
friendly_name = 'Deployed Service Stats'
def run(self):
def run(self) -> None:
logger.debug('Starting Deployed service stats collector')
servicePoolsToCheck: typing.Iterable[ServicePool] = ServicePool.objects.filter(
state=State.ACTIVE
).iterator()
stamp = getSqlDatetime()
servicePoolsToCheck: typing.Iterable[
models.ServicePool
] = models.ServicePool.objects.filter(state=State.ACTIVE).iterator()
stamp = models.getSqlDatetime()
# Global counters
totalAssigned, totalInUse, totalCached = 0, 0, 0
for servicePool in servicePoolsToCheck:
@ -87,14 +103,14 @@ class DeployedServiceStatsCollector(Job):
'Getting counters for service pool %s', servicePool.name
)
# Store a global "fake pool" with all stats
sp = ServicePool()
sp = models.ServicePool()
sp.id = -1
counters.addCounter(sp, counters.CT_ASSIGNED, totalAssigned, stamp=stamp)
counters.addCounter(sp, counters.CT_INUSE, totalInUse, stamp=stamp)
counters.addCounter(sp, counters.CT_CACHED, totalCached, stamp=stamp)
totalUsers, totalAssigned, totalWithService = 0, 0, 0
for auth in Authenticator.objects.all():
for auth in models.Authenticator.objects.all():
fltr = auth.users.filter(userServices__isnull=False).exclude(
userServices__state__in=State.INFO_STATES
)
@ -117,7 +133,7 @@ class DeployedServiceStatsCollector(Job):
stamp=stamp,
)
au = Authenticator()
au = models.Authenticator()
au.id = -1
counters.addCounter(au, counters.CT_AUTH_USERS, totalUsers, stamp=stamp)
counters.addCounter(au, counters.CT_AUTH_SERVICES, totalAssigned, stamp=stamp)
@ -152,3 +168,25 @@ class StatsCleaner(Job):
logger.exception('Cleaning up events')
logger.debug('Done statistics cleanup')
class StatsAccumulator(Job):
"""
This Job is responsible of compressing stats tables.
This is done by:
* For HOUR, DAY, WEEK
* For every row of same owner_id, owner_type
"""
frecuency = 3600 # Executed every 4 hours
frecuency_cfg = (
STATS_ACCUM_FREQUENCY
)
friendly_name = 'Statistics acummulator'
def run(self):
try:
StatsManager.manager().acummulate(STATS_ACCUM_MAX_CHUNK_TIME.getInt())
except Exception as e:
logger.exception('Compressing counters')
logger.debug('Done statistics compression')

View File

@ -2,7 +2,50 @@
from django.db import migrations, models
import django.db.models.deletion
import uds.models.stats_counters
import uds.models.stats_counters_accum
# Forward migration, change table type of uds_stats_c to MyISAM
# InnoDB is tremendlously slow when using this table
def forwards(apps, schema_editor):
try:
from django.db import connection
# If we are not using MySQL, do nothing
if connection.vendor != 'mysql':
return
cursor = connection.cursor()
# Check current table type, if it is not InnoDB, do nothing
cursor.execute(
'SELECT ENGINE FROM information_schema.TABLES WHERE TABLE_SCHEMA = DATABASE() AND TABLE_NAME = "uds_stats_c"'
)
if cursor.fetchone()[0] == 'InnoDB': # type: ignore
cursor.execute(
'ALTER TABLE uds_stats_c ENGINE=MyISAM'
)
except Exception: # nosec: fine
pass
# Backward migration, change table type of uds_stats_c to InnoDB
def backwards(apps, schema_editor):
return
"""
Backwards could restore table to innodb, but it's not needed, and it's slow
try:
from django.db import connection
cursor = connection.cursor()
# Check current table type, if it is not MyISAM, do nothing
cursor.execute(
'SELECT ENGINE FROM information_schema.TABLES WHERE TABLE_SCHEMA = DATABASE() AND TABLE_NAME = "uds_stats_c"'
)
if cursor.fetchone()[0] == 'MyISAM': # type: ignore
cursor.execute('ALTER TABLE uds_stats_c ENGINE=InnoDB')
cursor.execute('ALTER TABLE uds_stats_c ENGINE=InnoDB')
except Exception: # nosec: fine
pass
"""
class Migration(migrations.Migration):
@ -39,11 +82,6 @@ class Migration(migrations.Migration):
name='mfa',
field=models.ForeignKey(blank=True, null=True, on_delete=django.db.models.deletion.SET_NULL, related_name='authenticators', to='uds.mfa'),
),
migrations.AddField(
model_name='statscounters',
name='interval_type',
field=models.SmallIntegerField(db_index=True, default=uds.models.stats_counters.StatsCounters.IntervalType['NONE']),
),
migrations.RemoveIndex(
model_name='statscounters',
name='uds_stats_c_owner_t_db894d_idx',
@ -52,4 +90,36 @@ class Migration(migrations.Migration):
model_name='statscounters',
name='uds_stats_c_owner_t_a195c1_idx',
),
migrations.CreateModel(
name='StatsCountersAccum',
fields=[
('id', models.AutoField(auto_created=True, primary_key=True, serialize=False, verbose_name='ID')),
('owner_id', models.IntegerField(default=0)),
('owner_type', models.SmallIntegerField(default=0)),
('counter_type', models.SmallIntegerField(default=0)),
('interval_type', models.SmallIntegerField(choices=[(1, 'HOUR'), (2, 'DAY')], default=uds.models.stats_counters_accum.StatsCountersAccum.IntervalType['HOUR'])),
('stamp', models.IntegerField(default=0)),
('v_count', models.IntegerField(default=0)),
('v_sum', models.IntegerField(default=0)),
('v_max', models.IntegerField(default=0)),
('v_min', models.IntegerField(default=0)),
],
options={
'db_table': 'uds_stats_c_accum',
},
),
migrations.AddIndex(
model_name='statscountersaccum',
index=models.Index(fields=['stamp', 'interval_type', 'counter_type', 'owner_type', 'owner_id'], name='uds_stats_all'),
),
migrations.AddIndex(
model_name='statscountersaccum',
index=models.Index(fields=['stamp', 'interval_type', 'counter_type'], name='uds_stats_partial'),
),
migrations.AddIndex(
model_name='statscountersaccum',
index=models.Index(fields=['stamp', 'interval_type'], name='uds_stats_stamp'),
),
# Migrate uds_stats_c from Innodb to MyISAM if possible
migrations.RunPython(forwards, backwards),
]

View File

@ -71,6 +71,7 @@ from .log import Log
# Stats
from .stats_counters import StatsCounters
from .stats_counters_accum import StatsCountersAccum
from .stats_events import StatsEvents
# General utility models, such as a database cache (for caching remote content of slow connections to external services providers for example)

View File

@ -31,7 +31,6 @@
.. moduleauthor:: Adolfo Gómez, dkmaster at dkmon dot com
"""
import typing
import enum
import datetime
import logging
@ -39,7 +38,6 @@ from django.db import models
from .util import getSqlFnc
logger = logging.getLogger(__name__)
@ -48,21 +46,10 @@ class StatsCounters(models.Model):
Statistics about counters (number of users at a given time, number of services at a time, whatever...)
"""
# Valid intervals types for counters data
class IntervalType(enum.IntEnum):
NONE = 0
MINUTE = 1
HOUR = 2
DAY = 3
WEEK = 4
owner_id = models.IntegerField(db_index=True, default=0)
owner_type = models.SmallIntegerField(db_index=True, default=0)
counter_type = models.SmallIntegerField(db_index=True, default=0)
stamp = models.IntegerField(db_index=True, default=0)
interval_type = models.SmallIntegerField(
db_index=True, default=IntervalType.NONE
)
value = models.IntegerField(db_index=True, default=0)
# "fake" declarations for type checking
@ -89,9 +76,6 @@ class StatsCounters(models.Model):
q = StatsCounters.objects.filter(
owner_type__in=owner_type,
counter_type=counter_type,
interval_type=kwargs.get(
'interval_type', StatsCounters.IntervalType.NONE
),
)
if kwargs.get('owner_id'):
@ -136,7 +120,7 @@ class StatsCounters(models.Model):
floor = getSqlFnc('FLOOR')
if interval > 0:
q = q.extra(
q = q.extra( # nosec: SQL injection is not possible here
select={
'group_by_stamp': f'{floor}(stamp / {interval}) * {interval}',
},
@ -158,6 +142,4 @@ class StatsCounters(models.Model):
yield (i['group_by_stamp'], i['value'])
def __str__(self):
return u"Counter of {}({}): {} - {} - {}".format(
self.owner_type, self.owner_id, self.stamp, self.counter_type, self.value
)
return f'{datetime.datetime.fromtimestamp(self.stamp)} - {self.owner_id}:{self.owner_type}:{self.counter_type} {self.value}'

View File

@ -0,0 +1,241 @@
# -*- coding: utf-8 -*-
#
# Copyright (c) 2022 Virtual Cable S.L.U.
# All rights reserved.
#
# Redistribution and use in source and binary forms, with or without modification,
# are permitted provided that the following conditions are met:
#
# * Redistributions of source code must retain the above copyright notice,
# this list of conditions and the following disclaimer.
# * Redistributions in binary form must reproduce the above copyright notice,
# this list of conditions and the following disclaimer in the documentation
# and/or other materials provided with the distribution.
# * Neither the name of Virtual Cable S.L. nor the names of its contributors
# may be used to endorse or promote products derived from this software
# without specific prior written permission.
#
# THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS IS"
# AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE
# IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE ARE
# DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT HOLDER OR CONTRIBUTORS BE LIABLE
# FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR CONSEQUENTIAL
# DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR
# SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER
# CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY,
# OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE
# OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
"""
.. moduleauthor:: Adolfo Gómez, dkmaster at dkmon dot com
"""
import typing
import enum
import datetime
import logging
from django.db import models
from .util import getSqlFnc
from .stats_counters import StatsCounters
if typing.TYPE_CHECKING:
from django.db.models.query import ValuesQuerySet
logger = logging.getLogger(__name__)
class StatsCountersAccum(models.Model):
"""
Statistics about counters (number of users at a given time, number of services at a time, whatever...)
"""
# Valid intervals types for counters data
class IntervalType(enum.IntEnum):
HOUR = 1
DAY = 2
def seconds(self) -> int:
"""Returns the number of seconds for this interval type"""
if self == self.HOUR:
return 3600
if self == self.DAY:
return 86400
raise ValueError('Invalid interval type')
def prev(self) -> 'StatsCountersAccum.IntervalType':
"""Returns the previous interval type"""
if self == self.DAY:
return StatsCountersAccum.IntervalType.HOUR
raise ValueError('Invalid interval type')
owner_type = models.SmallIntegerField(default=0)
owner_id = models.IntegerField(default=0)
counter_type = models.SmallIntegerField(default=0)
interval_type = models.SmallIntegerField(
default=IntervalType.HOUR, choices=[(x.value, x.name) for x in IntervalType]
)
stamp = models.IntegerField(default=0)
# Values
v_count = models.IntegerField(default=0)
v_sum = models.IntegerField(default=0)
v_max = models.IntegerField(default=0)
v_min = models.IntegerField(default=0)
# "fake" declarations for type checking
objects: 'models.manager.Manager[StatsCountersAccum]'
class Meta:
"""
Meta class to declare db table
"""
indexes = [
models.Index(
fields=[
'stamp',
'interval_type',
'counter_type',
'owner_type',
'owner_id',
],
name='uds_stats_all',
),
models.Index(
fields=['stamp', 'interval_type', 'counter_type'],
name='uds_stats_partial',
),
models.Index(fields=['stamp', 'interval_type'], name='uds_stats_stamp'),
]
db_table = 'uds_stats_c_accum'
app_label = 'uds'
@staticmethod
def _adjust_to_interval(
value: int = -1,
interval_type: 'StatsCountersAccum.IntervalType' = IntervalType.HOUR,
) -> int:
"""Adjusts a timestamp to the given interval"""
if value == -1:
value = int(datetime.datetime.now().timestamp())
return value - (value % interval_type.seconds())
@staticmethod
def acummulate(interval_type: 'IntervalType', max_days: int = 7) -> None:
"""
Compresses data in the table, generating "compressed" version of the data (mean values)
"""
logger.debug('Optimizing stats counters table')
# Assign values depending on interval type
model: typing.Union[
typing.Type['StatsCountersAccum'],
typing.Type['StatsCounters'],
]
if interval_type == StatsCountersAccum.IntervalType.HOUR:
model = StatsCounters
else:
model = StatsCountersAccum
# Accumulate HOURS from StatsCounters
interval = interval_type.seconds()
# Get last stamp in table for this interval_type
start_record = (
StatsCountersAccum.objects.filter(interval_type=interval_type)
.order_by('stamp')
.last()
)
if start_record is None:
# No last stamp record, start from first StatsCounters record
start_record = model.objects.order_by('stamp').first()
if start_record is None: # Empty table
return
start_stamp = StatsCountersAccum._adjust_to_interval(
start_record.stamp, interval_type=interval_type
) # Adjust to hour
# End date is now, adjusted to interval so we dont have "leftovers"
end_stamp = StatsCountersAccum._adjust_to_interval(interval_type=interval_type)
# If time lapse is greater that max_days days, we will optimize in 30 days chunks
# This is to avoid having a huge query that will take a lot of time
if end_stamp - start_stamp > (max_days * 24 * 3600):
logger.info(
'Accumulating stats counters table in chunks, because of large time lapse'
)
end_stamp = start_stamp + (max_days * 24 * 3600)
# Fix end_stamp to interval, using base_end_stamp
end_stamp = StatsCountersAccum._adjust_to_interval(
end_stamp, interval_type=interval_type
)
logger.debug(
'Accumulating stats counters table from %s to %s',
datetime.datetime.fromtimestamp(start_stamp),
datetime.datetime.fromtimestamp(end_stamp),
)
# Get all records for this owner_type, counter_type, owner_id
floor = getSqlFnc('FLOOR')
query = (
model.objects.filter( # nosec: SQL injection is not possible here, all values are controlled
stamp__gte=start_stamp,
stamp__lt=end_stamp,
)
.extra(
select={
'group_by_stamp': f'{floor}(stamp / {interval})',
'owner_id': 'owner_id',
'owner_type': 'owner_type',
'counter_type': 'counter_type',
},
)
.values('group_by_stamp', 'owner_id', 'owner_type', 'counter_type')
)
if model == StatsCounters:
query = query.annotate(
min=models.Min('value'),
max=models.Max('value'),
count=models.Count('value'),
sum=models.Sum('value'),
)
else:
query = query.annotate(
min=models.Min('v_min'),
max=models.Max('v_max'),
count=models.Sum('v_count'),
sum=models.Sum('v_sum'),
)
"""Stores accumulated data in StatsCountersAccum"""
# Acummulate data
accumulated: typing.List[StatsCountersAccum] = [
StatsCountersAccum(
owner_type=rec['owner_type'],
owner_id=rec['owner_id'],
counter_type=rec['counter_type'],
interval_type=interval_type,
stamp=rec['group_by_stamp'] * interval_type.seconds()
+ interval_type.seconds(),
v_count=rec['count'],
v_sum=rec['sum'],
v_min=rec['min'],
v_max=rec['max'],
)
for rec in query
]
logger.debug('Inserting %s records', len(accumulated))
# If we have more than 20 inserts, do it in a single query
StatsCountersAccum.objects.bulk_create(accumulated)
def __str__(self) -> str:
return f'{datetime.datetime.fromtimestamp(self.stamp)} - {self.owner_type}:{self.owner_id}:{self.counter_type} {StatsCountersAccum.IntervalType(self.interval_type)} {self.v_count},{self.v_sum},{self.v_min},{self.v_max}'