From e98483aa90f8a3b171f73c3feef0785b63a3b7d5 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Adolfo=20G=C3=B3mez=20Garc=C3=ADa?= Date: Wed, 9 Nov 2022 21:44:27 +0100 Subject: [PATCH] adding stats acummulator --- .../core/workers/test_stats_c_acummulator.py | 186 ++++++++++++++ server/src/tests/fixtures/stats_counters.py | 71 +++++- server/src/uds/core/managers/stats.py | 8 +- .../src/uds/core/workers/stats_collector.py | 56 +++- .../uds/migrations/0043_auto_20220704_2120.py | 82 +++++- server/src/uds/models/stats_counters.py | 24 +- server/src/uds/models/stats_counters_accum.py | 241 ++++++++++++++++++ 7 files changed, 616 insertions(+), 52 deletions(-) create mode 100644 server/src/tests/core/workers/test_stats_c_acummulator.py create mode 100644 server/src/uds/models/stats_counters_accum.py diff --git a/server/src/tests/core/workers/test_stats_c_acummulator.py b/server/src/tests/core/workers/test_stats_c_acummulator.py new file mode 100644 index 000000000..4cd15caf6 --- /dev/null +++ b/server/src/tests/core/workers/test_stats_c_acummulator.py @@ -0,0 +1,186 @@ +# -*- coding: utf-8 -*- +# +# Copyright (c) 2022 Virtual Cable S.L.U. +# All rights reserved. +# +# Redistribution and use in source and binary forms, with or without modification, +# are permitted provided that the following conditions are met: +# +# * Redistributions of source code must retain the above copyright notice, +# this list of conditions and the following disclaimer. +# * Redistributions in binary form must reproduce the above copyright notice, +# this list of conditions and the following disclaimer in the documentation +# and/or other materials provided with the distribution. +# * Neither the name of Virtual Cable S.L.U. nor the names of its contributors +# may be used to endorse or promote products derived from this software +# without specific prior written permission. +# +# THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS IS" +# AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE +# IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE ARE +# DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT HOLDER OR CONTRIBUTORS BE LIABLE +# FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR CONSEQUENTIAL +# DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR +# SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER +# CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, +# OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE +# OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE. + +""" +@author: Adolfo Gómez, dkmaster at dkmon dot com +""" +import typing +import datetime + + +from uds import models +from uds.core.util.stats import counters + +from ...utils.test import UDSTransactionTestCase +from ...fixtures import stats_counters as fixtures_stats_counters + +from uds.core.workers import stats_collector +from uds.core.environment import Environment + + +START_DATE = datetime.datetime(2009, 12, 4, 0, 0, 0) +# Some random values, +DAYS = 4 +NUMBER_PER_HOUR = 6 # Can be any value divisor of 3600 +NUMBER_OF_POOLS = 11 +COUNTERS_TYPES = [counters.CT_ASSIGNED, counters.CT_INUSE] + + +class StatsFunction: + counter: int + multiplier: int + + def __init__(self, counter_multiplier: int = 100): + self.counter = 0 + self.multiplier = counter_multiplier + + def __call__(self, i: int, number_per_hour: int) -> int: + self.counter += 1 + return self.counter * 100 + + +class StatsAcummulatorTest(UDSTransactionTestCase): + def setUp(self): + # In fact, real data will not be assigned to Userservices, but it's ok for testing + for pool_id in range(NUMBER_OF_POOLS): + fixtures_stats_counters.create_stats_interval_total( + pool_id, + COUNTERS_TYPES, + START_DATE, + days=DAYS, + number_per_hour=NUMBER_PER_HOUR, + value=StatsFunction(10 ** (pool_id + 1)), + owner_type=counters.OT_DEPLOYED, + ) + + # Setup worker + stats_collector.STATS_ACCUM_MAX_CHUNK_TIME.set(DAYS // 2 + 1) + stats_collector.StatsAccumulator.setup() + + def test_stats_accumulator(self): + # Ensure first that we have correct number of base stats + base_stats = models.StatsCounters.objects.all() + total_base_stats = ( + DAYS * 24 * NUMBER_PER_HOUR * NUMBER_OF_POOLS * len(COUNTERS_TYPES) + ) # All stats + self.assertEqual(base_stats.count(), total_base_stats) + + optimizer = stats_collector.StatsAccumulator(Environment.getTempEnv()) + optimizer.run() + # Shoul have DAYS // 2 + 1 stats + hour_stats = models.StatsCountersAccum.objects.filter( + interval_type=models.StatsCountersAccum.IntervalType.HOUR + ) + total_hour_stats = (DAYS // 2 + 1) * 24 * NUMBER_OF_POOLS * len(COUNTERS_TYPES) + # Ensure that we have correct number of stats + self.assertEqual(hour_stats.count(), total_hour_stats) + # Days stats + day_stats = models.StatsCountersAccum.objects.filter( + interval_type=models.StatsCountersAccum.IntervalType.DAY + ) + total_day_stats = (DAYS // 2 + 1) * NUMBER_OF_POOLS * len(COUNTERS_TYPES) + self.assertEqual(day_stats.count(), total_day_stats) + + # Run it twice, now it will collect DAY - (DAYS // 2 + 1) stats + optimizer.run() + # In fact, hour, day and week have AVG and MAX, so we need to multiply by 2 on testing + total_hour_stats = DAYS * 24 * NUMBER_OF_POOLS * len(COUNTERS_TYPES) + self.assertEqual(hour_stats.count(), total_hour_stats) + # Days stats + day_stats = models.StatsCountersAccum.objects.filter( + interval_type=models.StatsCountersAccum.IntervalType.DAY + ) + total_day_stats = DAYS * NUMBER_OF_POOLS * len(COUNTERS_TYPES) + self.assertEqual(day_stats.count(), total_day_stats) + + # Calculate sum of stats, by hour + data: typing.Dict[str, typing.Dict[int, typing.List[int]]] = {} + for i in base_stats.order_by('owner_id', 'counter_type', 'stamp'): + stamp = i.stamp - (i.stamp % 3600) + 3600 # Round to hour and to next hour + d = data.setdefault(f'{i.owner_id}{i.counter_type}', {}) + d.setdefault(stamp, []).append(i.value) + + # Last hour NEVER is completed (until next hour appears), so it's not included in hour stats + # Check that hourly stats are correctly generated + stat: 'models.StatsCountersAccum' + for stat in hour_stats.order_by('owner_id', 'stamp'): + stamp = stat.stamp # Already rounded to hour + d = data[f'{stat.owner_id}{stat.counter_type}'] + self.assertEqual(stat.v_sum, sum(d[stamp])) + self.assertEqual(stat.v_max, max(d[stamp])) + self.assertEqual(stat.v_min, min(d[stamp])) + self.assertEqual(stat.v_count, len(d[stamp])) + + # Recalculate sum of stats, now from StatsCountersAccum (hourly) + data: typing.Dict[str, typing.Dict[int, typing.List[int]]] = {} + for i in hour_stats.order_by('owner_id', 'counter_type', 'stamp'): + pass + + return + + # Calculate sum of stats, by hour, day + data: typing.Dict[int, typing.Dict[int, typing.List[int]]] = {} + for i in base_stats.order_by('owner_id', 'stamp'): + stamp = i.stamp - (i.stamp % 3600) + 3600 # Round to hour and to next hour + d = data.setdefault(i.owner_id, {}) + d.setdefault(stamp, []).append(i.value) + + # Last hour NEVER is completed (until next hour appears), so it's not included in hour stats + + # Check that hourly stats are correctly generated + for i in hour_stats.order_by('owner_id', 'stamp'): + stamp = i.stamp # Already rounded to hour + d = data[i.owner_id] + if i.interval_operation == models.StatsCounters.IntervalOperation.AVG: + self.assertEqual(i.value, sum(d[stamp]) // len(d[stamp])) + else: + self.assertEqual(i.value, max(d[stamp])) + + # Now check day stats, max and avg + for op in ( + models.StatsCounters.IntervalOperation.AVG, + models.StatsCounters.IntervalOperation.MAX, + ): + data = {} + for i in hour_stats.filter(interval_operation=op).order_by( + 'owner_id', 'stamp' + ): + stamp = i.stamp - (i.stamp % 86400) + 86400 + d = data.setdefault(i.owner_id, {}) + d.setdefault(stamp, []).append(i.value) + + # Last day NEVER is completed (until next day appears), so it's not included in day stats + for i in day_stats.filter(interval_operation=op).order_by( + 'owner_id', 'stamp' + ): + stamp = i.stamp # Already rounded to day + d = data[i.owner_id] + if i.interval_operation == models.StatsCounters.IntervalOperation.AVG: + self.assertEqual(i.value, sum(d[stamp]) // len(d[stamp])) + else: + self.assertEqual(i.value, max(d[stamp])) diff --git a/server/src/tests/fixtures/stats_counters.py b/server/src/tests/fixtures/stats_counters.py index 060121409..9947da8d4 100644 --- a/server/src/tests/fixtures/stats_counters.py +++ b/server/src/tests/fixtures/stats_counters.py @@ -30,7 +30,6 @@ """ import typing import datetime -import random from uds.core.util.stats import counters from uds import models @@ -42,7 +41,8 @@ def create_stats_counters( counter_type: int, since: datetime.datetime, to: datetime.datetime, - number: int, + number: typing.Optional[int] = None, + interval: typing.Optional[int] = None, ) -> typing.List[models.StatsCounters]: ''' Create a list of counters with the given type, counter_type, since and to, save it in the database @@ -53,20 +53,63 @@ def create_stats_counters( to_stamp = int(to.timestamp()) # Calculate the time interval between each counter - interval = (to_stamp - since_stamp) / number - - counters = [] - for i in range(number): - counter = models.StatsCounters() - counter.owner_id = owner_id - counter.owner_type = owner_type - counter.counter_type = counter_type - counter.stamp = since_stamp + interval * i - counter.value = i * 10 - # And add it to the list - counters.append(counter) + if number is None: + if interval is None: + raise ValueError('Either number or interval must be provided') + number = (to_stamp - since_stamp) // interval + interval = (to_stamp - since_stamp) // number + counters = [ + models.StatsCounters( + owner_id=owner_id, + owner_type=owner_type, + counter_type=counter_type, + stamp=since_stamp + interval * i, + value=i*10, + ) + for i in range(number) + ] # Bulk create the counters models.StatsCounters.objects.bulk_create(counters) return counters + +def create_stats_interval_total( + id: int, + counter_type: typing.List[int], + since: datetime.datetime, + days: int, + number_per_hour: int, + value: typing.Union[int, typing.Callable[[int, int], int]], + owner_type: int = counters.OT_DEPLOYED, +) -> typing.List[models.StatsCounters]: + ''' + Creates a list of counters with the given type, counter_type, since and to, save it in the database + and return it + ''' + # Calculate the time interval between each counter + # Ensure number_per hour fix perfectly in an hour + if 3600 % number_per_hour != 0: + raise ValueError('Number of counters per hour must be a divisor of 3600') + + interval = 3600 // number_per_hour + + since_stamp = int(since.timestamp()) + + if isinstance(value, int): + xv = value + value = lambda x, y: xv + + cntrs = [ + models.StatsCounters( + owner_id=id, + owner_type=owner_type, + counter_type=ct, + stamp=since_stamp + interval * i, + value=value(i, ct), + ) for ct in counter_type for i in range(days * 24 * number_per_hour) + ] + + # Bulk create the counters + models.StatsCounters.objects.bulk_create(cntrs) + return cntrs diff --git a/server/src/uds/core/managers/stats.py b/server/src/uds/core/managers/stats.py index 20012d9c1..0aa9695d5 100644 --- a/server/src/uds/core/managers/stats.py +++ b/server/src/uds/core/managers/stats.py @@ -36,7 +36,7 @@ import typing from uds.core.util.config import GlobalConfig from uds.core.util import singleton -from uds.models import StatsCounters +from uds.models import StatsCounters, StatsCountersAccum from uds.models import getSqlDatetime, getSqlDatetimeAsUnix from uds.models import StatsEvents @@ -73,7 +73,7 @@ class StatsManager(metaclass=singleton.Singleton): def manager() -> 'StatsManager': return StatsManager() # Singleton pattern will return always the same instance - def __doCleanup(self, model): + def __doCleanup(self, model: typing.Type[typing.Union['StatsCounters', 'StatsEvents']]) -> None: minTime = time.mktime( ( getSqlDatetime() @@ -284,3 +284,7 @@ class StatsManager(metaclass=singleton.Singleton): """ self.__doCleanup(StatsEvents) + + def acummulate(self, max_days: int = 7): + for interval in StatsCountersAccum.IntervalType: + StatsCountersAccum.acummulate(interval, max_days) diff --git a/server/src/uds/core/workers/stats_collector.py b/server/src/uds/core/workers/stats_collector.py index d6d081f4e..6ac167e24 100644 --- a/server/src/uds/core/workers/stats_collector.py +++ b/server/src/uds/core/workers/stats_collector.py @@ -32,31 +32,47 @@ import logging import typing -from uds.models import ServicePool, Authenticator, getSqlDatetime +from django.utils.translation import gettext_lazy as _ + +from uds import models from uds.core.util.state import State from uds.core.util.stats import counters from uds.core.managers.stats import StatsManager from uds.core.jobs import Job +from uds.core.util import config logger = logging.getLogger(__name__) +# Early declaration of config variable +STATS_ACCUM_FREQUENCY = config.Config.section(config.ADMIN_SECTION).value( + 'Stats Accumulation Frequency', + '14400', + type=config.Config.NUMERIC_FIELD, + help=_('Frequency of stats collection in seconds. Default is 4 hours (14400 seconds)'), +) +STATS_ACCUM_MAX_CHUNK_TIME = config.Config.section(config.ADMIN_SECTION).value( + 'Stats Accumulation Chunk', + '7', + type=config.Config.NUMERIC_FIELD, + help=_('Maximum number of time to accumulate on one run. Default is 7 (1 week)'), +) class DeployedServiceStatsCollector(Job): """ This Job is responsible for collecting stats for every deployed service every ten minutes """ - frecuency = 599 # Once every ten minutes, 601 is prime, 599 also is prime, i like primes... :) + frecuency = 599 # Once every ten minutes friendly_name = 'Deployed Service Stats' def run(self) -> None: logger.debug('Starting Deployed service stats collector') - servicePoolsToCheck: typing.Iterable[ServicePool] = ServicePool.objects.filter( - state=State.ACTIVE - ).iterator() - stamp = getSqlDatetime() + servicePoolsToCheck: typing.Iterable[ + models.ServicePool + ] = models.ServicePool.objects.filter(state=State.ACTIVE).iterator() + stamp = models.getSqlDatetime() # Global counters totalAssigned, totalInUse, totalCached = 0, 0, 0 for servicePool in servicePoolsToCheck: @@ -87,14 +103,14 @@ class DeployedServiceStatsCollector(Job): 'Getting counters for service pool %s', servicePool.name ) # Store a global "fake pool" with all stats - sp = ServicePool() + sp = models.ServicePool() sp.id = -1 counters.addCounter(sp, counters.CT_ASSIGNED, totalAssigned, stamp=stamp) counters.addCounter(sp, counters.CT_INUSE, totalInUse, stamp=stamp) counters.addCounter(sp, counters.CT_CACHED, totalCached, stamp=stamp) totalUsers, totalAssigned, totalWithService = 0, 0, 0 - for auth in Authenticator.objects.all(): + for auth in models.Authenticator.objects.all(): fltr = auth.users.filter(userServices__isnull=False).exclude( userServices__state__in=State.INFO_STATES ) @@ -117,7 +133,7 @@ class DeployedServiceStatsCollector(Job): stamp=stamp, ) - au = Authenticator() + au = models.Authenticator() au.id = -1 counters.addCounter(au, counters.CT_AUTH_USERS, totalUsers, stamp=stamp) counters.addCounter(au, counters.CT_AUTH_SERVICES, totalAssigned, stamp=stamp) @@ -152,3 +168,25 @@ class StatsCleaner(Job): logger.exception('Cleaning up events') logger.debug('Done statistics cleanup') + + +class StatsAccumulator(Job): + """ + This Job is responsible of compressing stats tables. + This is done by: + * For HOUR, DAY, WEEK + * For every row of same owner_id, owner_type + """ + frecuency = 3600 # Executed every 4 hours + frecuency_cfg = ( + STATS_ACCUM_FREQUENCY + ) + friendly_name = 'Statistics acummulator' + + def run(self): + try: + StatsManager.manager().acummulate(STATS_ACCUM_MAX_CHUNK_TIME.getInt()) + except Exception as e: + logger.exception('Compressing counters') + + logger.debug('Done statistics compression') diff --git a/server/src/uds/migrations/0043_auto_20220704_2120.py b/server/src/uds/migrations/0043_auto_20220704_2120.py index e60c2330b..13d67af93 100644 --- a/server/src/uds/migrations/0043_auto_20220704_2120.py +++ b/server/src/uds/migrations/0043_auto_20220704_2120.py @@ -2,7 +2,50 @@ from django.db import migrations, models import django.db.models.deletion -import uds.models.stats_counters +import uds.models.stats_counters_accum + +# Forward migration, change table type of uds_stats_c to MyISAM +# InnoDB is tremendlously slow when using this table +def forwards(apps, schema_editor): + try: + from django.db import connection + + # If we are not using MySQL, do nothing + if connection.vendor != 'mysql': + return + + cursor = connection.cursor() + # Check current table type, if it is not InnoDB, do nothing + cursor.execute( + 'SELECT ENGINE FROM information_schema.TABLES WHERE TABLE_SCHEMA = DATABASE() AND TABLE_NAME = "uds_stats_c"' + ) + if cursor.fetchone()[0] == 'InnoDB': # type: ignore + cursor.execute( + 'ALTER TABLE uds_stats_c ENGINE=MyISAM' + ) + except Exception: # nosec: fine + pass + + +# Backward migration, change table type of uds_stats_c to InnoDB +def backwards(apps, schema_editor): + return + """ + Backwards could restore table to innodb, but it's not needed, and it's slow + try: + from django.db import connection + + cursor = connection.cursor() + # Check current table type, if it is not MyISAM, do nothing + cursor.execute( + 'SELECT ENGINE FROM information_schema.TABLES WHERE TABLE_SCHEMA = DATABASE() AND TABLE_NAME = "uds_stats_c"' + ) + if cursor.fetchone()[0] == 'MyISAM': # type: ignore + cursor.execute('ALTER TABLE uds_stats_c ENGINE=InnoDB') + cursor.execute('ALTER TABLE uds_stats_c ENGINE=InnoDB') + except Exception: # nosec: fine + pass + """ class Migration(migrations.Migration): @@ -39,11 +82,6 @@ class Migration(migrations.Migration): name='mfa', field=models.ForeignKey(blank=True, null=True, on_delete=django.db.models.deletion.SET_NULL, related_name='authenticators', to='uds.mfa'), ), - migrations.AddField( - model_name='statscounters', - name='interval_type', - field=models.SmallIntegerField(db_index=True, default=uds.models.stats_counters.StatsCounters.IntervalType['NONE']), - ), migrations.RemoveIndex( model_name='statscounters', name='uds_stats_c_owner_t_db894d_idx', @@ -52,4 +90,36 @@ class Migration(migrations.Migration): model_name='statscounters', name='uds_stats_c_owner_t_a195c1_idx', ), + migrations.CreateModel( + name='StatsCountersAccum', + fields=[ + ('id', models.AutoField(auto_created=True, primary_key=True, serialize=False, verbose_name='ID')), + ('owner_id', models.IntegerField(default=0)), + ('owner_type', models.SmallIntegerField(default=0)), + ('counter_type', models.SmallIntegerField(default=0)), + ('interval_type', models.SmallIntegerField(choices=[(2, 'HOUR'), (3, 'DAY')], default=uds.models.stats_counters_accum.StatsCountersAccum.IntervalType['HOUR'])), + ('stamp', models.IntegerField(default=0)), + ('v_count', models.IntegerField(default=0)), + ('v_sum', models.IntegerField(default=0)), + ('v_max', models.IntegerField(default=0)), + ('v_min', models.IntegerField(default=0)), + ], + options={ + 'db_table': 'uds_stats_c_accum', + }, + ), + migrations.AddIndex( + model_name='statscountersaccum', + index=models.Index(fields=['stamp', 'interval_type', 'counter_type', 'owner_type', 'owner_id'], name='uds_stats_all'), + ), + migrations.AddIndex( + model_name='statscountersaccum', + index=models.Index(fields=['stamp', 'interval_type', 'counter_type'], name='uds_stats_partial'), + ), + migrations.AddIndex( + model_name='statscountersaccum', + index=models.Index(fields=['stamp', 'interval_type'], name='uds_stats_stamp'), + ), + # Migrate uds_stats_c from Innodb to MyISAM if possible + migrations.RunPython(forwards, backwards), ] diff --git a/server/src/uds/models/stats_counters.py b/server/src/uds/models/stats_counters.py index ed1fddab2..d638e0afe 100644 --- a/server/src/uds/models/stats_counters.py +++ b/server/src/uds/models/stats_counters.py @@ -1,7 +1,7 @@ # -*- coding: utf-8 -*- # -# Copyright (c) 2012-2022 Virtual Cable S.L.U. +# Copyright (c) 2012-2020 Virtual Cable S.L.U. # All rights reserved. # # Redistribution and use in source and binary forms, with or without modification, @@ -31,7 +31,6 @@ .. moduleauthor:: Adolfo Gómez, dkmaster at dkmon dot com """ import typing -import enum import datetime import logging @@ -39,7 +38,6 @@ from django.db import models from .util import getSqlFnc - logger = logging.getLogger(__name__) @@ -48,21 +46,10 @@ class StatsCounters(models.Model): Statistics about counters (number of users at a given time, number of services at a time, whatever...) """ - # Valid intervals types for counters data - class IntervalType(enum.IntEnum): - NONE = 0 - MINUTE = 1 - HOUR = 2 - DAY = 3 - WEEK = 4 - owner_id = models.IntegerField(db_index=True, default=0) owner_type = models.SmallIntegerField(db_index=True, default=0) counter_type = models.SmallIntegerField(db_index=True, default=0) stamp = models.IntegerField(db_index=True, default=0) - interval_type = models.SmallIntegerField( - db_index=True, default=IntervalType.NONE - ) value = models.IntegerField(db_index=True, default=0) # "fake" declarations for type checking @@ -89,9 +76,6 @@ class StatsCounters(models.Model): q = StatsCounters.objects.filter( owner_type__in=owner_type, counter_type=counter_type, - interval_type=kwargs.get( - 'interval_type', StatsCounters.IntervalType.NONE - ), ) if kwargs.get('owner_id'): @@ -136,7 +120,7 @@ class StatsCounters(models.Model): floor = getSqlFnc('FLOOR') if interval > 0: - q = q.extra( # nosec: SQL injection is not possible here, all values are integers + q = q.extra( # nosec: SQL injection is not possible here select={ 'group_by_stamp': f'{floor}(stamp / {interval}) * {interval}', }, @@ -158,6 +142,4 @@ class StatsCounters(models.Model): yield (i['group_by_stamp'], i['value']) def __str__(self): - return u"Counter of {}({}): {} - {} - {}".format( - self.owner_type, self.owner_id, self.stamp, self.counter_type, self.value - ) + return f'{datetime.datetime.fromtimestamp(self.stamp)} - {self.owner_id}:{self.owner_type}:{self.counter_type} {self.value}' diff --git a/server/src/uds/models/stats_counters_accum.py b/server/src/uds/models/stats_counters_accum.py new file mode 100644 index 000000000..27395ee39 --- /dev/null +++ b/server/src/uds/models/stats_counters_accum.py @@ -0,0 +1,241 @@ +# -*- coding: utf-8 -*- + +# +# Copyright (c) 2022 Virtual Cable S.L.U. +# All rights reserved. +# +# Redistribution and use in source and binary forms, with or without modification, +# are permitted provided that the following conditions are met: +# +# * Redistributions of source code must retain the above copyright notice, +# this list of conditions and the following disclaimer. +# * Redistributions in binary form must reproduce the above copyright notice, +# this list of conditions and the following disclaimer in the documentation +# and/or other materials provided with the distribution. +# * Neither the name of Virtual Cable S.L. nor the names of its contributors +# may be used to endorse or promote products derived from this software +# without specific prior written permission. +# +# THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS IS" +# AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE +# IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE ARE +# DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT HOLDER OR CONTRIBUTORS BE LIABLE +# FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR CONSEQUENTIAL +# DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR +# SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER +# CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, +# OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE +# OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE. + +""" +.. moduleauthor:: Adolfo Gómez, dkmaster at dkmon dot com +""" +import typing +import enum +import datetime +import logging + +from django.db import models + +from .util import getSqlFnc +from .stats_counters import StatsCounters + +if typing.TYPE_CHECKING: + from django.db.models.query import ValuesQuerySet + +logger = logging.getLogger(__name__) + + +class StatsCountersAccum(models.Model): + """ + Statistics about counters (number of users at a given time, number of services at a time, whatever...) + """ + + # Valid intervals types for counters data + class IntervalType(enum.IntEnum): + HOUR = 1 + DAY = 2 + + def seconds(self) -> int: + """Returns the number of seconds for this interval type""" + if self == self.HOUR: + return 3600 + if self == self.DAY: + return 86400 + raise ValueError('Invalid interval type') + + def prev(self) -> 'StatsCountersAccum.IntervalType': + """Returns the previous interval type""" + if self == self.DAY: + return StatsCountersAccum.IntervalType.HOUR + raise ValueError('Invalid interval type') + + owner_type = models.SmallIntegerField(default=0) + owner_id = models.IntegerField(default=0) + counter_type = models.SmallIntegerField(default=0) + interval_type = models.SmallIntegerField( + default=IntervalType.HOUR, choices=[(x.value, x.name) for x in IntervalType] + ) + stamp = models.IntegerField(default=0) + # Values + v_count = models.IntegerField(default=0) + v_sum = models.IntegerField(default=0) + v_max = models.IntegerField(default=0) + v_min = models.IntegerField(default=0) + + # "fake" declarations for type checking + objects: 'models.manager.Manager[StatsCountersAccum]' + + class Meta: + """ + Meta class to declare db table + """ + + indexes = [ + models.Index( + fields=[ + 'stamp', + 'interval_type', + 'counter_type', + 'owner_type', + 'owner_id', + ], + name='uds_stats_all', + ), + models.Index( + fields=['stamp', 'interval_type', 'counter_type'], + name='uds_stats_partial', + ), + models.Index(fields=['stamp', 'interval_type'], name='uds_stats_stamp'), + ] + + db_table = 'uds_stats_c_accum' + app_label = 'uds' + + @staticmethod + def _adjust_to_interval( + value: int = -1, + interval_type: 'StatsCountersAccum.IntervalType' = IntervalType.HOUR, + ) -> int: + """Adjusts a timestamp to the given interval""" + if value == -1: + value = int(datetime.datetime.now().timestamp()) + return value - (value % interval_type.seconds()) + + @staticmethod + def acummulate(interval_type: 'IntervalType', max_days: int = 7) -> None: + """ + Compresses data in the table, generating "compressed" version of the data (mean values) + """ + logger.debug('Optimizing stats counters table') + + # Assign values depending on interval type + model: typing.Union[ + typing.Type['StatsCountersAccum'], + typing.Type['StatsCounters'], + ] + if interval_type == StatsCountersAccum.IntervalType.HOUR: + model = StatsCounters + else: + model = StatsCountersAccum + + # Accumulate HOURS from StatsCounters + interval = interval_type.seconds() + + # Get last stamp in table for this interval_type + start_record = ( + StatsCountersAccum.objects.filter(interval_type=interval_type) + .order_by('stamp') + .last() + ) + + if start_record is None: + # No last stamp record, start from first StatsCounters record + start_record = model.objects.order_by('stamp').first() + + if start_record is None: # Empty table + return + + start_stamp = StatsCountersAccum._adjust_to_interval( + start_record.stamp, interval_type=interval_type + ) # Adjust to hour + + # End date is now, adjusted to interval so we dont have "leftovers" + end_stamp = StatsCountersAccum._adjust_to_interval(interval_type=interval_type) + + # If time lapse is greater that max_days days, we will optimize in 30 days chunks + # This is to avoid having a huge query that will take a lot of time + if end_stamp - start_stamp > (max_days * 24 * 3600): + logger.info( + 'Accumulating stats counters table in chunks, because of large time lapse' + ) + end_stamp = start_stamp + (max_days * 24 * 3600) + + # Fix end_stamp to interval, using base_end_stamp + end_stamp = StatsCountersAccum._adjust_to_interval( + end_stamp, interval_type=interval_type + ) + + logger.debug( + 'Accumulating stats counters table from %s to %s', + datetime.datetime.fromtimestamp(start_stamp), + datetime.datetime.fromtimestamp(end_stamp), + ) + + # Get all records for this owner_type, counter_type, owner_id + floor = getSqlFnc('FLOOR') + query = ( + model.objects.filter( # nosec: SQL injection is not possible here, all values are controlled + stamp__gte=start_stamp, + stamp__lt=end_stamp, + ) + .extra( + select={ + 'group_by_stamp': f'{floor}(stamp / {interval})', + 'owner_id': 'owner_id', + 'owner_type': 'owner_type', + 'counter_type': 'counter_type', + }, + ) + .values('group_by_stamp', 'owner_id', 'owner_type', 'counter_type') + ) + + if model == StatsCounters: + query = query.annotate( + min=models.Min('value'), + max=models.Max('value'), + count=models.Count('value'), + sum=models.Sum('value'), + ) + else: + query = query.annotate( + min=models.Min('v_min'), + max=models.Max('v_max'), + count=models.Sum('v_count'), + sum=models.Sum('v_sum'), + ) + + """Stores accumulated data in StatsCountersAccum""" + # Acummulate data + accumulated: typing.List[StatsCountersAccum] = [ + StatsCountersAccum( + owner_type=rec['owner_type'], + owner_id=rec['owner_id'], + counter_type=rec['counter_type'], + interval_type=interval_type, + stamp=rec['group_by_stamp'] * interval_type.seconds() + + interval_type.seconds(), + v_count=rec['count'], + v_sum=rec['sum'], + v_min=rec['min'], + v_max=rec['max'], + ) + for rec in query + ] + + logger.debug('Inserting %s records', len(accumulated)) + # If we have more than 20 inserts, do it in a single query + StatsCountersAccum.objects.bulk_create(accumulated) + + def __str__(self) -> str: + return f'{datetime.datetime.fromtimestamp(self.stamp)} - {self.owner_type}:{self.owner_id}:{self.counter_type} {StatsCountersAccum.IntervalType(self.interval_type)} {self.v_count},{self.v_sum},{self.v_min},{self.v_max}'