From f59f47435b3d0fbe961f5ce494e1d61f16b35a62 Mon Sep 17 00:00:00 2001 From: Ryan Petrello Date: Wed, 21 Mar 2018 17:43:26 -0400 Subject: [PATCH] send job notification templates _after_ all events have been processed see: https://github.com/ansible/awx/issues/500 --- .../commands/run_callback_receiver.py | 37 +++++++++++++++---- awx/main/models/notifications.py | 24 ++++++++++++ awx/main/scheduler/task_manager.py | 6 +-- awx/main/tasks.py | 29 --------------- .../task_management/test_scheduler.py | 5 ++- 5 files changed, 59 insertions(+), 42 deletions(-) diff --git a/awx/main/management/commands/run_callback_receiver.py b/awx/main/management/commands/run_callback_receiver.py index fbbcb4f917..7bb33b033c 100644 --- a/awx/main/management/commands/run_callback_receiver.py +++ b/awx/main/management/commands/run_callback_receiver.py @@ -161,14 +161,35 @@ class CallbackBrokerWorker(ConsumerMixin): break if body.get('event') == 'EOF': - # EOF events are sent when stdout for the running task is - # closed. don't actually persist them to the database; we - # just use them to report `summary` websocket events as an - # approximation for when a job is "done" - emit_channel_notification( - 'jobs-summary', - dict(group_name='jobs', unified_job_id=job_identifier) - ) + try: + logger.info('Event processing is finished for Job {}, sending notifications'.format(job_identifier)) + # EOF events are sent when stdout for the running task is + # closed. don't actually persist them to the database; we + # just use them to report `summary` websocket events as an + # approximation for when a job is "done" + emit_channel_notification( + 'jobs-summary', + dict(group_name='jobs', unified_job_id=job_identifier) + ) + # Additionally, when we've processed all events, we should + # have all the data we need to send out success/failure + # notification templates + uj = UnifiedJob.objects.get(pk=job_identifier) + if hasattr(uj, 'send_notification_templates'): + retries = 0 + while retries < 5: + if uj.finished: + uj.send_notification_templates('succeeded' if uj.status == 'successful' else 'failed') + break + else: + # wait a few seconds to avoid a race where the + # events are persisted _before_ the UJ.status + # changes from running -> successful + retries += 1 + time.sleep(1) + uj = UnifiedJob.objects.get(pk=job_identifier) + except Exception: + logger.exception('Worker failed to emit notifications: Job {}'.format(job_identifier)) continue retries = 0 diff --git a/awx/main/models/notifications.py b/awx/main/models/notifications.py index db445f1852..b636b7f8d7 100644 --- a/awx/main/models/notifications.py +++ b/awx/main/models/notifications.py @@ -209,3 +209,27 @@ class JobNotificationMixin(object): def build_notification_failed_message(self): return self._build_notification_message('failed') + + def send_notification_templates(self, status_str): + from awx.main.tasks import send_notifications # avoid circular import + if status_str not in ['succeeded', 'failed']: + raise ValueError(_("status_str must be either succeeded or failed")) + try: + notification_templates = self.get_notification_templates() + except Exception: + logger.warn("No notification template defined for emitting notification") + notification_templates = None + if notification_templates: + if status_str == 'succeeded': + notification_template_type = 'success' + else: + notification_template_type = 'error' + all_notification_templates = set(notification_templates.get(notification_template_type, []) + notification_templates.get('any', [])) + if len(all_notification_templates): + try: + (notification_subject, notification_body) = getattr(self, 'build_notification_%s_message' % status_str)() + except AttributeError: + raise NotImplementedError("build_notification_%s_message() does not exist" % status_str) + send_notifications.delay([n.generate_notification(notification_subject, notification_body).id + for n in all_notification_templates], + job_id=self.id) diff --git a/awx/main/scheduler/task_manager.py b/awx/main/scheduler/task_manager.py index 861d098a02..254b9472d3 100644 --- a/awx/main/scheduler/task_manager.py +++ b/awx/main/scheduler/task_manager.py @@ -38,7 +38,6 @@ from awx.main.utils import get_type_for_model from awx.main.signals import disable_activity_stream from awx.main.scheduler.dependency_graph import DependencyGraph -from awx.main import tasks as awx_tasks from awx.main.utils import decrypt_field # Celery @@ -499,7 +498,8 @@ class TaskManager(): except DatabaseError: logger.error("Task {} DB error in marking failed. Job possibly deleted.".format(task.log_format)) continue - awx_tasks._send_notification_templates(task, 'failed') + if hasattr(task, 'send_notification_templates'): + task.send_notification_templates('failed') task.websocket_emit_status(new_status) logger.error("{}Task {} has no record in celery. Marking as failed".format( 'Isolated ' if isolated else '', task.log_format)) @@ -630,4 +630,4 @@ class TaskManager(): # Operations whose queries rely on modifications made during the atomic scheduling session for wfj in WorkflowJob.objects.filter(id__in=finished_wfjs): - awx_tasks._send_notification_templates(wfj, 'succeeded' if wfj.status == 'successful' else 'failed') + wfj.send_notification_templates('succeeded' if wfj.status == 'successful' else 'failed') diff --git a/awx/main/tasks.py b/awx/main/tasks.py index 14001296aa..fc8fd6e4cc 100644 --- a/awx/main/tasks.py +++ b/awx/main/tasks.py @@ -430,30 +430,6 @@ def awx_periodic_scheduler(self): state.save() -def _send_notification_templates(instance, status_str): - if status_str not in ['succeeded', 'failed']: - raise ValueError(_("status_str must be either succeeded or failed")) - try: - notification_templates = instance.get_notification_templates() - except Exception: - logger.warn("No notification template defined for emitting notification") - notification_templates = None - if notification_templates: - if status_str == 'succeeded': - notification_template_type = 'success' - else: - notification_template_type = 'error' - all_notification_templates = set(notification_templates.get(notification_template_type, []) + notification_templates.get('any', [])) - if len(all_notification_templates): - try: - (notification_subject, notification_body) = getattr(instance, 'build_notification_%s_message' % status_str)() - except AttributeError: - raise NotImplementedError("build_notification_%s_message() does not exist" % status_str) - send_notifications.delay([n.generate_notification(notification_subject, notification_body).id - for n in all_notification_templates], - job_id=instance.id) - - @shared_task(bind=True, queue='tower', base=LogErrorsTask) def handle_work_success(self, result, task_actual): try: @@ -464,8 +440,6 @@ def handle_work_success(self, result, task_actual): if not instance: return - _send_notification_templates(instance, 'succeeded') - from awx.main.scheduler.tasks import run_job_complete run_job_complete.delay(instance.id) @@ -501,9 +475,6 @@ def handle_work_error(task_id, *args, **kwargs): instance.save() instance.websocket_emit_status("failed") - if first_instance: - _send_notification_templates(first_instance, 'failed') - # We only send 1 job complete message since all the job completion message # handling does is trigger the scheduler. If we extend the functionality of # what the job complete message handler does then we may want to send a diff --git a/awx/main/tests/functional/task_management/test_scheduler.py b/awx/main/tests/functional/task_management/test_scheduler.py index d32f9daa2b..1d87417b17 100644 --- a/awx/main/tests/functional/task_management/test_scheduler.py +++ b/awx/main/tests/functional/task_management/test_scheduler.py @@ -13,6 +13,7 @@ from awx.main.models import ( Instance, WorkflowJob, ) +from awx.main.models.notifications import JobNotificationMixin @pytest.mark.django_db @@ -323,7 +324,7 @@ class TestReaper(): }) @pytest.mark.django_db - @mock.patch('awx.main.tasks._send_notification_templates') + @mock.patch.object(JobNotificationMixin, 'send_notification_templates') @mock.patch.object(TaskManager, 'get_active_tasks', lambda self: ([], [])) def test_cleanup_inconsistent_task(self, notify, active_tasks, considered_jobs, reapable_jobs, running_tasks, waiting_tasks, mocker): tm = TaskManager() @@ -338,7 +339,7 @@ class TestReaper(): j.save.assert_not_called() assert notify.call_count == 4 - notify.assert_has_calls([mock.call(j, 'failed') for j in reapable_jobs], any_order=True) + notify.assert_has_calls([mock.call('failed') for j in reapable_jobs], any_order=True) for j in reapable_jobs: j.websocket_emit_status.assert_called_once_with('failed')