1
0
mirror of https://github.com/ansible/awx.git synced 2024-10-26 16:25:06 +03:00

Merge pull request #8236 from ryanpetrello/more-callback-cleanup

refactor some callback receiver code

Reviewed-by: https://github.com/apps/softwarefactory-project-zuul
This commit is contained in:
softwarefactory-project-zuul[bot] 2020-09-28 15:20:29 +00:00 committed by GitHub
commit 36d4f255a3
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
2 changed files with 35 additions and 33 deletions

View File

@ -11,7 +11,7 @@ import traceback
from django.conf import settings
from django.utils.timezone import now as tz_now
from django.db import DatabaseError, OperationalError, connection as django_connection
from django.db.utils import InterfaceError, InternalError, IntegrityError
from django.db.utils import InterfaceError, InternalError
import psutil
@ -120,20 +120,12 @@ class CallbackBrokerWorker(BaseWorker):
e.modified = now
try:
cls.objects.bulk_create(events)
except Exception as exc:
except Exception:
# if an exception occurs, we should re-attempt to save the
# events one-by-one, because something in the list is
# broken/stale (e.g., an IntegrityError on a specific event)
# broken/stale
for e in events:
try:
if (
isinstance(exc, IntegrityError) and
getattr(e, 'host_id', '')
):
# this is one potential IntegrityError we can
# work around - if the host disappears before
# the event can be processed
e.host_id = None
e.save()
except Exception:
logger.exception('Database Error Saving Job Event')

View File

@ -5,6 +5,7 @@ import logging
from collections import defaultdict
from django.conf import settings
from django.core.exceptions import ObjectDoesNotExist
from django.db import models, DatabaseError, connection
from django.utils.dateparse import parse_datetime
from django.utils.text import Truncator
@ -349,31 +350,36 @@ class BasePlaybookEvent(CreatedModifiedModel):
pass
if isinstance(self, JobEvent):
hostnames = self._hostnames()
self._update_host_summary_from_stats(set(hostnames))
if self.job.inventory:
try:
self.job.inventory.update_computed_fields()
except DatabaseError:
logger.exception('Computed fields database error saving event {}'.format(self.pk))
try:
job = self.job
except ObjectDoesNotExist:
job = None
if job:
hostnames = self._hostnames()
self._update_host_summary_from_stats(set(hostnames))
if job.inventory:
try:
job.inventory.update_computed_fields()
except DatabaseError:
logger.exception('Computed fields database error saving event {}'.format(self.pk))
# find parent links and progagate changed=T and failed=T
changed = self.job.job_events.filter(changed=True).exclude(parent_uuid=None).only('parent_uuid').values_list('parent_uuid', flat=True).distinct() # noqa
failed = self.job.job_events.filter(failed=True).exclude(parent_uuid=None).only('parent_uuid').values_list('parent_uuid', flat=True).distinct() # noqa
# find parent links and progagate changed=T and failed=T
changed = job.job_events.filter(changed=True).exclude(parent_uuid=None).only('parent_uuid').values_list('parent_uuid', flat=True).distinct() # noqa
failed = job.job_events.filter(failed=True).exclude(parent_uuid=None).only('parent_uuid').values_list('parent_uuid', flat=True).distinct() # noqa
JobEvent.objects.filter(
job_id=self.job_id, uuid__in=changed
).update(changed=True)
JobEvent.objects.filter(
job_id=self.job_id, uuid__in=failed
).update(failed=True)
JobEvent.objects.filter(
job_id=self.job_id, uuid__in=changed
).update(changed=True)
JobEvent.objects.filter(
job_id=self.job_id, uuid__in=failed
).update(failed=True)
# send success/failure notifications when we've finished handling the playbook_on_stats event
from awx.main.tasks import handle_success_and_failure_notifications # circular import
# send success/failure notifications when we've finished handling the playbook_on_stats event
from awx.main.tasks import handle_success_and_failure_notifications # circular import
def _send_notifications():
handle_success_and_failure_notifications.apply_async([self.job.id])
connection.on_commit(_send_notifications)
def _send_notifications():
handle_success_and_failure_notifications.apply_async([job.id])
connection.on_commit(_send_notifications)
for field in ('playbook', 'play', 'task', 'role'):
@ -497,7 +503,11 @@ class JobEvent(BasePlaybookEvent):
def _update_host_summary_from_stats(self, hostnames):
with ignore_inventory_computed_fields():
if not self.job or not self.job.inventory:
try:
if not self.job or not self.job.inventory:
logger.info('Event {} missing job or inventory, host summaries not updated'.format(self.pk))
return
except ObjectDoesNotExist:
logger.info('Event {} missing job or inventory, host summaries not updated'.format(self.pk))
return
job = self.job