1
0
mirror of https://github.com/ansible/awx.git synced 2024-10-30 13:55:31 +03:00

Merge pull request #6166 from fosterseth/feature-cleanup_jobs-perf

Improve performance of cleanup_jobs

Reviewed-by: https://github.com/apps/softwarefactory-project-zuul
This commit is contained in:
softwarefactory-project-zuul[bot] 2020-03-19 20:09:39 +00:00 committed by GitHub
commit 0a5acb6520
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
3 changed files with 388 additions and 20 deletions

View File

@ -21,6 +21,8 @@ from awx.main.signals import (
disable_computed_fields
)
from awx.main.management.commands.deletion import AWXCollector, pre_delete
class Command(BaseCommand):
'''
@ -57,27 +59,37 @@ class Command(BaseCommand):
action='store_true', dest='only_workflow_jobs',
help='Remove workflow jobs')
def cleanup_jobs(self):
#jobs_qs = Job.objects.exclude(status__in=('pending', 'running'))
#jobs_qs = jobs_qs.filter(created__lte=self.cutoff)
skipped, deleted = 0, 0
jobs = Job.objects.filter(created__lt=self.cutoff)
for job in jobs.iterator():
job_display = '"%s" (%d host summaries, %d events)' % \
(str(job),
job.job_host_summaries.count(), job.job_events.count())
if job.status in ('pending', 'waiting', 'running'):
action_text = 'would skip' if self.dry_run else 'skipping'
self.logger.debug('%s %s job %s', action_text, job.status, job_display)
skipped += 1
else:
action_text = 'would delete' if self.dry_run else 'deleting'
self.logger.info('%s %s', action_text, job_display)
if not self.dry_run:
job.delete()
deleted += 1
skipped += Job.objects.filter(created__gte=self.cutoff).count()
def cleanup_jobs(self):
skipped, deleted = 0, 0
batch_size = 1000000
while True:
# get queryset for available jobs to remove
qs = Job.objects.filter(created__lt=self.cutoff).exclude(status__in=['pending', 'waiting', 'running'])
# get pk list for the first N (batch_size) objects
pk_list = qs[0:batch_size].values_list('pk')
# You cannot delete queries with sql LIMIT set, so we must
# create a new query from this pk_list
qs_batch = Job.objects.filter(pk__in=pk_list)
just_deleted = 0
if not self.dry_run:
del_query = pre_delete(qs_batch)
collector = AWXCollector(del_query.db)
collector.collect(del_query)
_, models_deleted = collector.delete()
if models_deleted:
just_deleted = models_deleted['main.Job']
deleted += just_deleted
else:
just_deleted = 0 # break from loop, this is dry run
deleted = qs.count()
if just_deleted == 0:
break
skipped += (Job.objects.filter(created__gte=self.cutoff) | Job.objects.filter(status__in=['pending', 'waiting', 'running'])).count()
return skipped, deleted
def cleanup_ad_hoc_commands(self):

View File

@ -0,0 +1,177 @@
from django.contrib.contenttypes.models import ContentType
from django.db.models.deletion import (
DO_NOTHING, Collector, get_candidate_relations_to_delete,
)
from collections import Counter, OrderedDict
from django.db import transaction
from django.db.models import sql
def bulk_related_objects(field, objs, using):
# This overrides the method in django.contrib.contenttypes.fields.py
"""
Return all objects related to ``objs`` via this ``GenericRelation``.
"""
return field.remote_field.model._base_manager.db_manager(using).filter(**{
"%s__pk" % field.content_type_field_name: ContentType.objects.db_manager(using).get_for_model(
field.model, for_concrete_model=field.for_concrete_model).pk,
"%s__in" % field.object_id_field_name: list(objs.values_list('pk', flat=True))
})
def pre_delete(qs):
# taken from .delete method in django.db.models.query.py
assert qs.query.can_filter(), \
"Cannot use 'limit' or 'offset' with delete."
if qs._fields is not None:
raise TypeError("Cannot call delete() after .values() or .values_list()")
del_query = qs._chain()
# The delete is actually 2 queries - one to find related objects,
# and one to delete. Make sure that the discovery of related
# objects is performed on the same database as the deletion.
del_query._for_write = True
# Disable non-supported fields.
del_query.query.select_for_update = False
del_query.query.select_related = False
del_query.query.clear_ordering(force_empty=True)
return del_query
class AWXCollector(Collector):
def add(self, objs, source=None, nullable=False, reverse_dependency=False):
"""
Add 'objs' to the collection of objects to be deleted. If the call is
the result of a cascade, 'source' should be the model that caused it,
and 'nullable' should be set to True if the relation can be null.
Return a list of all objects that were not already collected.
"""
if not objs.exists():
return objs
model = objs.model
self.data.setdefault(model, [])
self.data[model].append(objs)
# Nullable relationships can be ignored -- they are nulled out before
# deleting, and therefore do not affect the order in which objects have
# to be deleted.
if source is not None and not nullable:
if reverse_dependency:
source, model = model, source
self.dependencies.setdefault(
source._meta.concrete_model, set()).add(model._meta.concrete_model)
return objs
def add_field_update(self, field, value, objs):
"""
Schedule a field update. 'objs' must be a homogeneous iterable
collection of model instances (e.g. a QuerySet).
"""
if not objs.exists():
return
model = objs.model
self.field_updates.setdefault(model, {})
self.field_updates[model].setdefault((field, value), [])
self.field_updates[model][(field, value)].append(objs)
def collect(self, objs, source=None, nullable=False, collect_related=True,
source_attr=None, reverse_dependency=False, keep_parents=False):
"""
Add 'objs' to the collection of objects to be deleted as well as all
parent instances. 'objs' must be a homogeneous iterable collection of
model instances (e.g. a QuerySet). If 'collect_related' is True,
related objects will be handled by their respective on_delete handler.
If the call is the result of a cascade, 'source' should be the model
that caused it and 'nullable' should be set to True, if the relation
can be null.
If 'reverse_dependency' is True, 'source' will be deleted before the
current model, rather than after. (Needed for cascading to parent
models, the one case in which the cascade follows the forwards
direction of an FK rather than the reverse direction.)
If 'keep_parents' is True, data of parent model's will be not deleted.
"""
if hasattr(objs, 'polymorphic_disabled'):
objs.polymorphic_disabled = True
if self.can_fast_delete(objs):
self.fast_deletes.append(objs)
return
new_objs = self.add(objs, source, nullable,
reverse_dependency=reverse_dependency)
if not new_objs.exists():
return
model = new_objs.model
if not keep_parents:
# Recursively collect concrete model's parent models, but not their
# related objects. These will be found by meta.get_fields()
concrete_model = model._meta.concrete_model
for ptr in concrete_model._meta.parents.keys():
if ptr:
parent_objs = ptr.objects.filter(pk__in = new_objs.values_list('pk', flat=True))
self.collect(parent_objs, source=model,
collect_related=False,
reverse_dependency=True)
if collect_related:
parents = model._meta.parents
for related in get_candidate_relations_to_delete(model._meta):
# Preserve parent reverse relationships if keep_parents=True.
if keep_parents and related.model in parents:
continue
field = related.field
if field.remote_field.on_delete == DO_NOTHING:
continue
related_qs = self.related_objects(related, new_objs)
if self.can_fast_delete(related_qs, from_field=field):
self.fast_deletes.append(related_qs)
elif related_qs:
field.remote_field.on_delete(self, field, related_qs, self.using)
for field in model._meta.private_fields:
if hasattr(field, 'bulk_related_objects'):
# It's something like generic foreign key.
sub_objs = bulk_related_objects(field, new_objs, self.using)
self.collect(sub_objs, source=model, nullable=True)
def delete(self):
self.sort()
# collect pk_list before deletion (once things start to delete
# queries might not be able to retreive pk list)
del_dict = OrderedDict()
for model, instances in self.data.items():
del_dict.setdefault(model, [])
for inst in instances:
del_dict[model] += list(inst.values_list('pk', flat=True))
deleted_counter = Counter()
with transaction.atomic(using=self.using, savepoint=False):
# update fields
for model, instances_for_fieldvalues in self.field_updates.items():
for (field, value), instances in instances_for_fieldvalues.items():
for inst in instances:
query = sql.UpdateQuery(model)
query.update_batch(inst.values_list('pk', flat=True),
{field.name: value}, self.using)
# fast deletes
for qs in self.fast_deletes:
count = qs._raw_delete(using=self.using)
deleted_counter[qs.model._meta.label] += count
# delete instances
for model, pk_list in del_dict.items():
query = sql.DeleteQuery(model)
count = query.delete_batch(pk_list, self.using)
deleted_counter[model._meta.label] += count
return sum(deleted_counter.values()), dict(deleted_counter)

View File

@ -0,0 +1,179 @@
import pytest
from datetime import datetime, timedelta
from pytz import timezone
from collections import OrderedDict
from django.db.models.deletion import Collector, SET_NULL, CASCADE
from django.core.management import call_command
from awx.main.management.commands.deletion import AWXCollector
from awx.main.models import (
JobTemplate, User, Job, JobEvent, Notification,
WorkflowJobNode, JobHostSummary
)
@pytest.fixture
def setup_environment(inventory, project, machine_credential, host, notification_template, label):
'''
Create old jobs and new jobs, with various other objects to hit the
related fields of Jobs. This makes sure on_delete() effects are tested
properly.
'''
old_jobs = []
new_jobs = []
days = 10
days_str = str(days)
jt = JobTemplate.objects.create(name='testjt', inventory=inventory, project=project)
jt.credentials.add(machine_credential)
jt_user = User.objects.create(username='jobtemplateuser')
jt.execute_role.members.add(jt_user)
notification = Notification()
notification.notification_template = notification_template
notification.save()
for i in range(3):
job1 = jt.create_job()
job1.created =datetime.now(tz=timezone('UTC'))
job1.save()
# create jobs with current time
JobEvent.create_from_data(job_id=job1.pk, uuid='abc123', event='runner_on_start',
stdout='a' * 1025).save()
new_jobs.append(job1)
job2 = jt.create_job()
# create jobs 10 days ago
job2.created = datetime.now(tz=timezone('UTC')) - timedelta(days=days)
job2.save()
job2.dependent_jobs.add(job1)
JobEvent.create_from_data(job_id=job2.pk, uuid='abc123', event='runner_on_start',
stdout='a' * 1025).save()
old_jobs.append(job2)
jt.last_job = job2
jt.current_job = job2
jt.save()
host.last_job = job2
host.save()
notification.unifiedjob_notifications.add(job2)
label.unifiedjob_labels.add(job2)
jn = WorkflowJobNode.objects.create(job=job2)
jn.save()
jh = JobHostSummary.objects.create(job=job2)
jh.save()
return (old_jobs, new_jobs, days_str)
@pytest.mark.django_db
def test_cleanup_jobs(setup_environment):
(old_jobs, new_jobs, days_str) = setup_environment
# related_fields
related = [f for f in Job._meta.get_fields(include_hidden=True)
if f.auto_created and not
f.concrete and
(f.one_to_one or f.one_to_many)]
job = old_jobs[-1] # last job
# gather related objects for job
related_should_be_removed = {}
related_should_be_null = {}
for r in related:
qs = r.related_model._base_manager.using('default').filter(
**{"%s__in" % r.field.name: [job.pk]}
)
if qs.exists():
if r.field.remote_field.on_delete == CASCADE:
related_should_be_removed[qs.model] = set(qs.values_list('pk', flat=True))
if r.field.remote_field.on_delete == SET_NULL:
related_should_be_null[(qs.model,r.field.name)] = set(qs.values_list('pk', flat=True))
assert related_should_be_removed
assert related_should_be_null
call_command('cleanup_jobs', '--days', days_str)
# make sure old jobs are removed
assert not Job.objects.filter(pk__in=[obj.pk for obj in old_jobs]).exists()
# make sure new jobs are untouched
assert len(new_jobs) == Job.objects.filter(pk__in=[obj.pk for obj in new_jobs]).count()
# make sure related objects are destroyed or set to NULL (none)
for model, values in related_should_be_removed.items():
assert not model.objects.filter(pk__in=values).exists()
for (model,fieldname), values in related_should_be_null.items():
for v in values:
assert not getattr(model.objects.get(pk=v), fieldname)
@pytest.mark.django_db
def test_awxcollector(setup_environment):
'''
Efforts to improve the performance of cleanup_jobs involved
sub-classing the django Collector class. This unit test will
check for parity between the django Collector and the modified
AWXCollector class. AWXCollector is used in cleanup_jobs to
bulk-delete old jobs from the database.
Specifically, Collector has four dictionaries to check:
.dependencies, .data, .fast_deletes, and .field_updates
These tests will convert each dictionary from AWXCollector
(after running .collect on jobs), from querysets to sets of
objects. The final result should be a dictionary that is
equivalent to django's Collector.
'''
(old_jobs, new_jobs, days_str) = setup_environment
collector = Collector('default')
collector.collect(old_jobs)
awx_col = AWXCollector('default')
# awx_col accepts a queryset as input
awx_col.collect(Job.objects.filter(pk__in=[obj.pk for obj in old_jobs]))
# check that dependencies are the same
assert awx_col.dependencies == collector.dependencies
# check that objects to delete are the same
awx_del_dict = OrderedDict()
for model, instances in awx_col.data.items():
awx_del_dict.setdefault(model, set())
for inst in instances:
# .update() will put each object in a queryset into the set
awx_del_dict[model].update(inst)
assert awx_del_dict == collector.data
# check that field updates are the same
awx_del_dict = OrderedDict()
for model, instances_for_fieldvalues in awx_col.field_updates.items():
awx_del_dict.setdefault(model, {})
for (field, value), instances in instances_for_fieldvalues.items():
awx_del_dict[model].setdefault((field,value), set())
for inst in instances:
awx_del_dict[model][(field,value)].update(inst)
# collector field updates don't use the base (polymorphic parent) model, e.g.
# it will use JobTemplate instead of UnifiedJobTemplate. Therefore,
# we need to rebuild the dictionary and grab the model from the field
collector_del_dict = OrderedDict()
for model, instances_for_fieldvalues in collector.field_updates.items():
for (field,value), instances in instances_for_fieldvalues.items():
collector_del_dict.setdefault(field.model, {})
collector_del_dict[field.model][(field, value)] = collector.field_updates[model][(field,value)]
assert awx_del_dict == collector_del_dict
# check that fast deletes are the same
collector_fast_deletes = set()
for q in collector.fast_deletes:
collector_fast_deletes.update(q)
awx_col_fast_deletes = set()
for q in awx_col.fast_deletes:
awx_col_fast_deletes.update(q)
assert collector_fast_deletes == awx_col_fast_deletes