1
0
mirror of https://github.com/dkmstr/openuds.git synced 2025-03-20 06:50:23 +03:00

adding stats acummulator

This commit is contained in:
Adolfo Gómez García 2022-11-09 21:44:27 +01:00
parent 7799dcd58e
commit e98483aa90
No known key found for this signature in database
GPG Key ID: DD1ABF20724CDA23
7 changed files with 616 additions and 52 deletions

View File

@ -0,0 +1,186 @@
# -*- coding: utf-8 -*-
#
# Copyright (c) 2022 Virtual Cable S.L.U.
# All rights reserved.
#
# Redistribution and use in source and binary forms, with or without modification,
# are permitted provided that the following conditions are met:
#
# * Redistributions of source code must retain the above copyright notice,
# this list of conditions and the following disclaimer.
# * Redistributions in binary form must reproduce the above copyright notice,
# this list of conditions and the following disclaimer in the documentation
# and/or other materials provided with the distribution.
# * Neither the name of Virtual Cable S.L.U. nor the names of its contributors
# may be used to endorse or promote products derived from this software
# without specific prior written permission.
#
# THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS IS"
# AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE
# IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE ARE
# DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT HOLDER OR CONTRIBUTORS BE LIABLE
# FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR CONSEQUENTIAL
# DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR
# SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER
# CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY,
# OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE
# OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
"""
@author: Adolfo Gómez, dkmaster at dkmon dot com
"""
import typing
import datetime
from uds import models
from uds.core.util.stats import counters
from ...utils.test import UDSTransactionTestCase
from ...fixtures import stats_counters as fixtures_stats_counters
from uds.core.workers import stats_collector
from uds.core.environment import Environment
START_DATE = datetime.datetime(2009, 12, 4, 0, 0, 0)
# Some random values,
DAYS = 4
NUMBER_PER_HOUR = 6 # Can be any value divisor of 3600
NUMBER_OF_POOLS = 11
COUNTERS_TYPES = [counters.CT_ASSIGNED, counters.CT_INUSE]
class StatsFunction:
counter: int
multiplier: int
def __init__(self, counter_multiplier: int = 100):
self.counter = 0
self.multiplier = counter_multiplier
def __call__(self, i: int, number_per_hour: int) -> int:
self.counter += 1
return self.counter * 100
class StatsAcummulatorTest(UDSTransactionTestCase):
def setUp(self):
# In fact, real data will not be assigned to Userservices, but it's ok for testing
for pool_id in range(NUMBER_OF_POOLS):
fixtures_stats_counters.create_stats_interval_total(
pool_id,
COUNTERS_TYPES,
START_DATE,
days=DAYS,
number_per_hour=NUMBER_PER_HOUR,
value=StatsFunction(10 ** (pool_id + 1)),
owner_type=counters.OT_DEPLOYED,
)
# Setup worker
stats_collector.STATS_ACCUM_MAX_CHUNK_TIME.set(DAYS // 2 + 1)
stats_collector.StatsAccumulator.setup()
def test_stats_accumulator(self):
# Ensure first that we have correct number of base stats
base_stats = models.StatsCounters.objects.all()
total_base_stats = (
DAYS * 24 * NUMBER_PER_HOUR * NUMBER_OF_POOLS * len(COUNTERS_TYPES)
) # All stats
self.assertEqual(base_stats.count(), total_base_stats)
optimizer = stats_collector.StatsAccumulator(Environment.getTempEnv())
optimizer.run()
# Shoul have DAYS // 2 + 1 stats
hour_stats = models.StatsCountersAccum.objects.filter(
interval_type=models.StatsCountersAccum.IntervalType.HOUR
)
total_hour_stats = (DAYS // 2 + 1) * 24 * NUMBER_OF_POOLS * len(COUNTERS_TYPES)
# Ensure that we have correct number of stats
self.assertEqual(hour_stats.count(), total_hour_stats)
# Days stats
day_stats = models.StatsCountersAccum.objects.filter(
interval_type=models.StatsCountersAccum.IntervalType.DAY
)
total_day_stats = (DAYS // 2 + 1) * NUMBER_OF_POOLS * len(COUNTERS_TYPES)
self.assertEqual(day_stats.count(), total_day_stats)
# Run it twice, now it will collect DAY - (DAYS // 2 + 1) stats
optimizer.run()
# In fact, hour, day and week have AVG and MAX, so we need to multiply by 2 on testing
total_hour_stats = DAYS * 24 * NUMBER_OF_POOLS * len(COUNTERS_TYPES)
self.assertEqual(hour_stats.count(), total_hour_stats)
# Days stats
day_stats = models.StatsCountersAccum.objects.filter(
interval_type=models.StatsCountersAccum.IntervalType.DAY
)
total_day_stats = DAYS * NUMBER_OF_POOLS * len(COUNTERS_TYPES)
self.assertEqual(day_stats.count(), total_day_stats)
# Calculate sum of stats, by hour
data: typing.Dict[str, typing.Dict[int, typing.List[int]]] = {}
for i in base_stats.order_by('owner_id', 'counter_type', 'stamp'):
stamp = i.stamp - (i.stamp % 3600) + 3600 # Round to hour and to next hour
d = data.setdefault(f'{i.owner_id}{i.counter_type}', {})
d.setdefault(stamp, []).append(i.value)
# Last hour NEVER is completed (until next hour appears), so it's not included in hour stats
# Check that hourly stats are correctly generated
stat: 'models.StatsCountersAccum'
for stat in hour_stats.order_by('owner_id', 'stamp'):
stamp = stat.stamp # Already rounded to hour
d = data[f'{stat.owner_id}{stat.counter_type}']
self.assertEqual(stat.v_sum, sum(d[stamp]))
self.assertEqual(stat.v_max, max(d[stamp]))
self.assertEqual(stat.v_min, min(d[stamp]))
self.assertEqual(stat.v_count, len(d[stamp]))
# Recalculate sum of stats, now from StatsCountersAccum (hourly)
data: typing.Dict[str, typing.Dict[int, typing.List[int]]] = {}
for i in hour_stats.order_by('owner_id', 'counter_type', 'stamp'):
pass
return
# Calculate sum of stats, by hour, day
data: typing.Dict[int, typing.Dict[int, typing.List[int]]] = {}
for i in base_stats.order_by('owner_id', 'stamp'):
stamp = i.stamp - (i.stamp % 3600) + 3600 # Round to hour and to next hour
d = data.setdefault(i.owner_id, {})
d.setdefault(stamp, []).append(i.value)
# Last hour NEVER is completed (until next hour appears), so it's not included in hour stats
# Check that hourly stats are correctly generated
for i in hour_stats.order_by('owner_id', 'stamp'):
stamp = i.stamp # Already rounded to hour
d = data[i.owner_id]
if i.interval_operation == models.StatsCounters.IntervalOperation.AVG:
self.assertEqual(i.value, sum(d[stamp]) // len(d[stamp]))
else:
self.assertEqual(i.value, max(d[stamp]))
# Now check day stats, max and avg
for op in (
models.StatsCounters.IntervalOperation.AVG,
models.StatsCounters.IntervalOperation.MAX,
):
data = {}
for i in hour_stats.filter(interval_operation=op).order_by(
'owner_id', 'stamp'
):
stamp = i.stamp - (i.stamp % 86400) + 86400
d = data.setdefault(i.owner_id, {})
d.setdefault(stamp, []).append(i.value)
# Last day NEVER is completed (until next day appears), so it's not included in day stats
for i in day_stats.filter(interval_operation=op).order_by(
'owner_id', 'stamp'
):
stamp = i.stamp # Already rounded to day
d = data[i.owner_id]
if i.interval_operation == models.StatsCounters.IntervalOperation.AVG:
self.assertEqual(i.value, sum(d[stamp]) // len(d[stamp]))
else:
self.assertEqual(i.value, max(d[stamp]))

View File

@ -30,7 +30,6 @@
"""
import typing
import datetime
import random
from uds.core.util.stats import counters
from uds import models
@ -42,7 +41,8 @@ def create_stats_counters(
counter_type: int,
since: datetime.datetime,
to: datetime.datetime,
number: int,
number: typing.Optional[int] = None,
interval: typing.Optional[int] = None,
) -> typing.List[models.StatsCounters]:
'''
Create a list of counters with the given type, counter_type, since and to, save it in the database
@ -53,20 +53,63 @@ def create_stats_counters(
to_stamp = int(to.timestamp())
# Calculate the time interval between each counter
interval = (to_stamp - since_stamp) / number
counters = []
for i in range(number):
counter = models.StatsCounters()
counter.owner_id = owner_id
counter.owner_type = owner_type
counter.counter_type = counter_type
counter.stamp = since_stamp + interval * i
counter.value = i * 10
# And add it to the list
counters.append(counter)
if number is None:
if interval is None:
raise ValueError('Either number or interval must be provided')
number = (to_stamp - since_stamp) // interval
interval = (to_stamp - since_stamp) // number
counters = [
models.StatsCounters(
owner_id=owner_id,
owner_type=owner_type,
counter_type=counter_type,
stamp=since_stamp + interval * i,
value=i*10,
)
for i in range(number)
]
# Bulk create the counters
models.StatsCounters.objects.bulk_create(counters)
return counters
def create_stats_interval_total(
id: int,
counter_type: typing.List[int],
since: datetime.datetime,
days: int,
number_per_hour: int,
value: typing.Union[int, typing.Callable[[int, int], int]],
owner_type: int = counters.OT_DEPLOYED,
) -> typing.List[models.StatsCounters]:
'''
Creates a list of counters with the given type, counter_type, since and to, save it in the database
and return it
'''
# Calculate the time interval between each counter
# Ensure number_per hour fix perfectly in an hour
if 3600 % number_per_hour != 0:
raise ValueError('Number of counters per hour must be a divisor of 3600')
interval = 3600 // number_per_hour
since_stamp = int(since.timestamp())
if isinstance(value, int):
xv = value
value = lambda x, y: xv
cntrs = [
models.StatsCounters(
owner_id=id,
owner_type=owner_type,
counter_type=ct,
stamp=since_stamp + interval * i,
value=value(i, ct),
) for ct in counter_type for i in range(days * 24 * number_per_hour)
]
# Bulk create the counters
models.StatsCounters.objects.bulk_create(cntrs)
return cntrs

View File

@ -36,7 +36,7 @@ import typing
from uds.core.util.config import GlobalConfig
from uds.core.util import singleton
from uds.models import StatsCounters
from uds.models import StatsCounters, StatsCountersAccum
from uds.models import getSqlDatetime, getSqlDatetimeAsUnix
from uds.models import StatsEvents
@ -73,7 +73,7 @@ class StatsManager(metaclass=singleton.Singleton):
def manager() -> 'StatsManager':
return StatsManager() # Singleton pattern will return always the same instance
def __doCleanup(self, model):
def __doCleanup(self, model: typing.Type[typing.Union['StatsCounters', 'StatsEvents']]) -> None:
minTime = time.mktime(
(
getSqlDatetime()
@ -284,3 +284,7 @@ class StatsManager(metaclass=singleton.Singleton):
"""
self.__doCleanup(StatsEvents)
def acummulate(self, max_days: int = 7):
for interval in StatsCountersAccum.IntervalType:
StatsCountersAccum.acummulate(interval, max_days)

View File

@ -32,31 +32,47 @@
import logging
import typing
from uds.models import ServicePool, Authenticator, getSqlDatetime
from django.utils.translation import gettext_lazy as _
from uds import models
from uds.core.util.state import State
from uds.core.util.stats import counters
from uds.core.managers.stats import StatsManager
from uds.core.jobs import Job
from uds.core.util import config
logger = logging.getLogger(__name__)
# Early declaration of config variable
STATS_ACCUM_FREQUENCY = config.Config.section(config.ADMIN_SECTION).value(
'Stats Accumulation Frequency',
'14400',
type=config.Config.NUMERIC_FIELD,
help=_('Frequency of stats collection in seconds. Default is 4 hours (14400 seconds)'),
)
STATS_ACCUM_MAX_CHUNK_TIME = config.Config.section(config.ADMIN_SECTION).value(
'Stats Accumulation Chunk',
'7',
type=config.Config.NUMERIC_FIELD,
help=_('Maximum number of time to accumulate on one run. Default is 7 (1 week)'),
)
class DeployedServiceStatsCollector(Job):
"""
This Job is responsible for collecting stats for every deployed service every ten minutes
"""
frecuency = 599 # Once every ten minutes, 601 is prime, 599 also is prime, i like primes... :)
frecuency = 599 # Once every ten minutes
friendly_name = 'Deployed Service Stats'
def run(self) -> None:
logger.debug('Starting Deployed service stats collector')
servicePoolsToCheck: typing.Iterable[ServicePool] = ServicePool.objects.filter(
state=State.ACTIVE
).iterator()
stamp = getSqlDatetime()
servicePoolsToCheck: typing.Iterable[
models.ServicePool
] = models.ServicePool.objects.filter(state=State.ACTIVE).iterator()
stamp = models.getSqlDatetime()
# Global counters
totalAssigned, totalInUse, totalCached = 0, 0, 0
for servicePool in servicePoolsToCheck:
@ -87,14 +103,14 @@ class DeployedServiceStatsCollector(Job):
'Getting counters for service pool %s', servicePool.name
)
# Store a global "fake pool" with all stats
sp = ServicePool()
sp = models.ServicePool()
sp.id = -1
counters.addCounter(sp, counters.CT_ASSIGNED, totalAssigned, stamp=stamp)
counters.addCounter(sp, counters.CT_INUSE, totalInUse, stamp=stamp)
counters.addCounter(sp, counters.CT_CACHED, totalCached, stamp=stamp)
totalUsers, totalAssigned, totalWithService = 0, 0, 0
for auth in Authenticator.objects.all():
for auth in models.Authenticator.objects.all():
fltr = auth.users.filter(userServices__isnull=False).exclude(
userServices__state__in=State.INFO_STATES
)
@ -117,7 +133,7 @@ class DeployedServiceStatsCollector(Job):
stamp=stamp,
)
au = Authenticator()
au = models.Authenticator()
au.id = -1
counters.addCounter(au, counters.CT_AUTH_USERS, totalUsers, stamp=stamp)
counters.addCounter(au, counters.CT_AUTH_SERVICES, totalAssigned, stamp=stamp)
@ -152,3 +168,25 @@ class StatsCleaner(Job):
logger.exception('Cleaning up events')
logger.debug('Done statistics cleanup')
class StatsAccumulator(Job):
"""
This Job is responsible of compressing stats tables.
This is done by:
* For HOUR, DAY, WEEK
* For every row of same owner_id, owner_type
"""
frecuency = 3600 # Executed every 4 hours
frecuency_cfg = (
STATS_ACCUM_FREQUENCY
)
friendly_name = 'Statistics acummulator'
def run(self):
try:
StatsManager.manager().acummulate(STATS_ACCUM_MAX_CHUNK_TIME.getInt())
except Exception as e:
logger.exception('Compressing counters')
logger.debug('Done statistics compression')

View File

@ -2,7 +2,50 @@
from django.db import migrations, models
import django.db.models.deletion
import uds.models.stats_counters
import uds.models.stats_counters_accum
# Forward migration, change table type of uds_stats_c to MyISAM
# InnoDB is tremendlously slow when using this table
def forwards(apps, schema_editor):
try:
from django.db import connection
# If we are not using MySQL, do nothing
if connection.vendor != 'mysql':
return
cursor = connection.cursor()
# Check current table type, if it is not InnoDB, do nothing
cursor.execute(
'SELECT ENGINE FROM information_schema.TABLES WHERE TABLE_SCHEMA = DATABASE() AND TABLE_NAME = "uds_stats_c"'
)
if cursor.fetchone()[0] == 'InnoDB': # type: ignore
cursor.execute(
'ALTER TABLE uds_stats_c ENGINE=MyISAM'
)
except Exception: # nosec: fine
pass
# Backward migration, change table type of uds_stats_c to InnoDB
def backwards(apps, schema_editor):
return
"""
Backwards could restore table to innodb, but it's not needed, and it's slow
try:
from django.db import connection
cursor = connection.cursor()
# Check current table type, if it is not MyISAM, do nothing
cursor.execute(
'SELECT ENGINE FROM information_schema.TABLES WHERE TABLE_SCHEMA = DATABASE() AND TABLE_NAME = "uds_stats_c"'
)
if cursor.fetchone()[0] == 'MyISAM': # type: ignore
cursor.execute('ALTER TABLE uds_stats_c ENGINE=InnoDB')
cursor.execute('ALTER TABLE uds_stats_c ENGINE=InnoDB')
except Exception: # nosec: fine
pass
"""
class Migration(migrations.Migration):
@ -39,11 +82,6 @@ class Migration(migrations.Migration):
name='mfa',
field=models.ForeignKey(blank=True, null=True, on_delete=django.db.models.deletion.SET_NULL, related_name='authenticators', to='uds.mfa'),
),
migrations.AddField(
model_name='statscounters',
name='interval_type',
field=models.SmallIntegerField(db_index=True, default=uds.models.stats_counters.StatsCounters.IntervalType['NONE']),
),
migrations.RemoveIndex(
model_name='statscounters',
name='uds_stats_c_owner_t_db894d_idx',
@ -52,4 +90,36 @@ class Migration(migrations.Migration):
model_name='statscounters',
name='uds_stats_c_owner_t_a195c1_idx',
),
migrations.CreateModel(
name='StatsCountersAccum',
fields=[
('id', models.AutoField(auto_created=True, primary_key=True, serialize=False, verbose_name='ID')),
('owner_id', models.IntegerField(default=0)),
('owner_type', models.SmallIntegerField(default=0)),
('counter_type', models.SmallIntegerField(default=0)),
('interval_type', models.SmallIntegerField(choices=[(2, 'HOUR'), (3, 'DAY')], default=uds.models.stats_counters_accum.StatsCountersAccum.IntervalType['HOUR'])),
('stamp', models.IntegerField(default=0)),
('v_count', models.IntegerField(default=0)),
('v_sum', models.IntegerField(default=0)),
('v_max', models.IntegerField(default=0)),
('v_min', models.IntegerField(default=0)),
],
options={
'db_table': 'uds_stats_c_accum',
},
),
migrations.AddIndex(
model_name='statscountersaccum',
index=models.Index(fields=['stamp', 'interval_type', 'counter_type', 'owner_type', 'owner_id'], name='uds_stats_all'),
),
migrations.AddIndex(
model_name='statscountersaccum',
index=models.Index(fields=['stamp', 'interval_type', 'counter_type'], name='uds_stats_partial'),
),
migrations.AddIndex(
model_name='statscountersaccum',
index=models.Index(fields=['stamp', 'interval_type'], name='uds_stats_stamp'),
),
# Migrate uds_stats_c from Innodb to MyISAM if possible
migrations.RunPython(forwards, backwards),
]

View File

@ -1,7 +1,7 @@
# -*- coding: utf-8 -*-
#
# Copyright (c) 2012-2022 Virtual Cable S.L.U.
# Copyright (c) 2012-2020 Virtual Cable S.L.U.
# All rights reserved.
#
# Redistribution and use in source and binary forms, with or without modification,
@ -31,7 +31,6 @@
.. moduleauthor:: Adolfo Gómez, dkmaster at dkmon dot com
"""
import typing
import enum
import datetime
import logging
@ -39,7 +38,6 @@ from django.db import models
from .util import getSqlFnc
logger = logging.getLogger(__name__)
@ -48,21 +46,10 @@ class StatsCounters(models.Model):
Statistics about counters (number of users at a given time, number of services at a time, whatever...)
"""
# Valid intervals types for counters data
class IntervalType(enum.IntEnum):
NONE = 0
MINUTE = 1
HOUR = 2
DAY = 3
WEEK = 4
owner_id = models.IntegerField(db_index=True, default=0)
owner_type = models.SmallIntegerField(db_index=True, default=0)
counter_type = models.SmallIntegerField(db_index=True, default=0)
stamp = models.IntegerField(db_index=True, default=0)
interval_type = models.SmallIntegerField(
db_index=True, default=IntervalType.NONE
)
value = models.IntegerField(db_index=True, default=0)
# "fake" declarations for type checking
@ -89,9 +76,6 @@ class StatsCounters(models.Model):
q = StatsCounters.objects.filter(
owner_type__in=owner_type,
counter_type=counter_type,
interval_type=kwargs.get(
'interval_type', StatsCounters.IntervalType.NONE
),
)
if kwargs.get('owner_id'):
@ -136,7 +120,7 @@ class StatsCounters(models.Model):
floor = getSqlFnc('FLOOR')
if interval > 0:
q = q.extra( # nosec: SQL injection is not possible here, all values are integers
q = q.extra( # nosec: SQL injection is not possible here
select={
'group_by_stamp': f'{floor}(stamp / {interval}) * {interval}',
},
@ -158,6 +142,4 @@ class StatsCounters(models.Model):
yield (i['group_by_stamp'], i['value'])
def __str__(self):
return u"Counter of {}({}): {} - {} - {}".format(
self.owner_type, self.owner_id, self.stamp, self.counter_type, self.value
)
return f'{datetime.datetime.fromtimestamp(self.stamp)} - {self.owner_id}:{self.owner_type}:{self.counter_type} {self.value}'

View File

@ -0,0 +1,241 @@
# -*- coding: utf-8 -*-
#
# Copyright (c) 2022 Virtual Cable S.L.U.
# All rights reserved.
#
# Redistribution and use in source and binary forms, with or without modification,
# are permitted provided that the following conditions are met:
#
# * Redistributions of source code must retain the above copyright notice,
# this list of conditions and the following disclaimer.
# * Redistributions in binary form must reproduce the above copyright notice,
# this list of conditions and the following disclaimer in the documentation
# and/or other materials provided with the distribution.
# * Neither the name of Virtual Cable S.L. nor the names of its contributors
# may be used to endorse or promote products derived from this software
# without specific prior written permission.
#
# THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS IS"
# AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE
# IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE ARE
# DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT HOLDER OR CONTRIBUTORS BE LIABLE
# FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR CONSEQUENTIAL
# DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR
# SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER
# CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY,
# OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE
# OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
"""
.. moduleauthor:: Adolfo Gómez, dkmaster at dkmon dot com
"""
import typing
import enum
import datetime
import logging
from django.db import models
from .util import getSqlFnc
from .stats_counters import StatsCounters
if typing.TYPE_CHECKING:
from django.db.models.query import ValuesQuerySet
logger = logging.getLogger(__name__)
class StatsCountersAccum(models.Model):
"""
Statistics about counters (number of users at a given time, number of services at a time, whatever...)
"""
# Valid intervals types for counters data
class IntervalType(enum.IntEnum):
HOUR = 1
DAY = 2
def seconds(self) -> int:
"""Returns the number of seconds for this interval type"""
if self == self.HOUR:
return 3600
if self == self.DAY:
return 86400
raise ValueError('Invalid interval type')
def prev(self) -> 'StatsCountersAccum.IntervalType':
"""Returns the previous interval type"""
if self == self.DAY:
return StatsCountersAccum.IntervalType.HOUR
raise ValueError('Invalid interval type')
owner_type = models.SmallIntegerField(default=0)
owner_id = models.IntegerField(default=0)
counter_type = models.SmallIntegerField(default=0)
interval_type = models.SmallIntegerField(
default=IntervalType.HOUR, choices=[(x.value, x.name) for x in IntervalType]
)
stamp = models.IntegerField(default=0)
# Values
v_count = models.IntegerField(default=0)
v_sum = models.IntegerField(default=0)
v_max = models.IntegerField(default=0)
v_min = models.IntegerField(default=0)
# "fake" declarations for type checking
objects: 'models.manager.Manager[StatsCountersAccum]'
class Meta:
"""
Meta class to declare db table
"""
indexes = [
models.Index(
fields=[
'stamp',
'interval_type',
'counter_type',
'owner_type',
'owner_id',
],
name='uds_stats_all',
),
models.Index(
fields=['stamp', 'interval_type', 'counter_type'],
name='uds_stats_partial',
),
models.Index(fields=['stamp', 'interval_type'], name='uds_stats_stamp'),
]
db_table = 'uds_stats_c_accum'
app_label = 'uds'
@staticmethod
def _adjust_to_interval(
value: int = -1,
interval_type: 'StatsCountersAccum.IntervalType' = IntervalType.HOUR,
) -> int:
"""Adjusts a timestamp to the given interval"""
if value == -1:
value = int(datetime.datetime.now().timestamp())
return value - (value % interval_type.seconds())
@staticmethod
def acummulate(interval_type: 'IntervalType', max_days: int = 7) -> None:
"""
Compresses data in the table, generating "compressed" version of the data (mean values)
"""
logger.debug('Optimizing stats counters table')
# Assign values depending on interval type
model: typing.Union[
typing.Type['StatsCountersAccum'],
typing.Type['StatsCounters'],
]
if interval_type == StatsCountersAccum.IntervalType.HOUR:
model = StatsCounters
else:
model = StatsCountersAccum
# Accumulate HOURS from StatsCounters
interval = interval_type.seconds()
# Get last stamp in table for this interval_type
start_record = (
StatsCountersAccum.objects.filter(interval_type=interval_type)
.order_by('stamp')
.last()
)
if start_record is None:
# No last stamp record, start from first StatsCounters record
start_record = model.objects.order_by('stamp').first()
if start_record is None: # Empty table
return
start_stamp = StatsCountersAccum._adjust_to_interval(
start_record.stamp, interval_type=interval_type
) # Adjust to hour
# End date is now, adjusted to interval so we dont have "leftovers"
end_stamp = StatsCountersAccum._adjust_to_interval(interval_type=interval_type)
# If time lapse is greater that max_days days, we will optimize in 30 days chunks
# This is to avoid having a huge query that will take a lot of time
if end_stamp - start_stamp > (max_days * 24 * 3600):
logger.info(
'Accumulating stats counters table in chunks, because of large time lapse'
)
end_stamp = start_stamp + (max_days * 24 * 3600)
# Fix end_stamp to interval, using base_end_stamp
end_stamp = StatsCountersAccum._adjust_to_interval(
end_stamp, interval_type=interval_type
)
logger.debug(
'Accumulating stats counters table from %s to %s',
datetime.datetime.fromtimestamp(start_stamp),
datetime.datetime.fromtimestamp(end_stamp),
)
# Get all records for this owner_type, counter_type, owner_id
floor = getSqlFnc('FLOOR')
query = (
model.objects.filter( # nosec: SQL injection is not possible here, all values are controlled
stamp__gte=start_stamp,
stamp__lt=end_stamp,
)
.extra(
select={
'group_by_stamp': f'{floor}(stamp / {interval})',
'owner_id': 'owner_id',
'owner_type': 'owner_type',
'counter_type': 'counter_type',
},
)
.values('group_by_stamp', 'owner_id', 'owner_type', 'counter_type')
)
if model == StatsCounters:
query = query.annotate(
min=models.Min('value'),
max=models.Max('value'),
count=models.Count('value'),
sum=models.Sum('value'),
)
else:
query = query.annotate(
min=models.Min('v_min'),
max=models.Max('v_max'),
count=models.Sum('v_count'),
sum=models.Sum('v_sum'),
)
"""Stores accumulated data in StatsCountersAccum"""
# Acummulate data
accumulated: typing.List[StatsCountersAccum] = [
StatsCountersAccum(
owner_type=rec['owner_type'],
owner_id=rec['owner_id'],
counter_type=rec['counter_type'],
interval_type=interval_type,
stamp=rec['group_by_stamp'] * interval_type.seconds()
+ interval_type.seconds(),
v_count=rec['count'],
v_sum=rec['sum'],
v_min=rec['min'],
v_max=rec['max'],
)
for rec in query
]
logger.debug('Inserting %s records', len(accumulated))
# If we have more than 20 inserts, do it in a single query
StatsCountersAccum.objects.bulk_create(accumulated)
def __str__(self) -> str:
return f'{datetime.datetime.fromtimestamp(self.stamp)} - {self.owner_type}:{self.owner_id}:{self.counter_type} {StatsCountersAccum.IntervalType(self.interval_type)} {self.v_count},{self.v_sum},{self.v_min},{self.v_max}'