From efbdb67c33a0a3fd4b337502f5b0eb4a3c3c0c9e Mon Sep 17 00:00:00 2001 From: Chris Church Date: Wed, 7 Aug 2013 02:42:27 -0400 Subject: [PATCH] Work on AC-205. Filter job events that are internal as a result of async polling, update event display name for various async job events. --- awx/main/access.py | 7 +- awx/main/models/__init__.py | 41 +++++-- awx/main/tests/tasks.py | 212 +++++++++++++++++++++++++++++++++++- 3 files changed, 245 insertions(+), 15 deletions(-) diff --git a/awx/main/access.py b/awx/main/access.py index a42e99c108..88bf3dd4af 100644 --- a/awx/main/access.py +++ b/awx/main/access.py @@ -894,13 +894,18 @@ class JobEventAccess(BaseAccess): def get_queryset(self): qs = self.model.objects.distinct() + + # Filter certain "internal" events generating by async polling. + qs = qs.exclude(event__in=('runner_on_ok', 'runner_on_failed'), + event_data__icontains='"ansible_job_id": "', + event_data__contains='"module_name": "async_status"') + if self.user.is_superuser: return qs job_qs = self.user.get_queryset(Job) host_qs = self.user.get_queryset(Host) qs = qs.filter(Q(host__isnull=True) | Q(host__in=host_qs), job__in=job_qs) - # FIXME: Filter certain extra events from async polling? return qs def can_add(self, data): diff --git a/awx/main/models/__init__.py b/awx/main/models/__init__.py index 7b26ab4e0d..28e6849727 100644 --- a/awx/main/models/__init__.py +++ b/awx/main/models/__init__.py @@ -1032,8 +1032,8 @@ class JobEvent(models.Model): (3, 'runner_on_unreachable', _('Host Unreachable'), True), (3, 'runner_on_no_hosts', _('No Hosts Remaining'), False), (3, 'runner_on_async_poll', _('Host Polling'), False), - (3, 'runner_on_async_ok', _('Host OK'), False), - (3, 'runner_on_async_failed', _('Host Failure'), True), + (3, 'runner_on_async_ok', _('Host Async OK'), False), + (3, 'runner_on_async_failed', _('Host Async Failure'), True), # AWX does not yet support --diff mode (3, 'runner_on_file_diff', _('File Difference'), False), (0, 'playbook_on_start', _('Playbook Started'), False), @@ -1140,9 +1140,20 @@ class JobEvent(models.Model): if self.task is not None: msg = "%s (%s)" % (msg, self.task) - # Change display for runner events trigged by async polling. + # Change display for runner events trigged by async polling. Some of + # these events may not show in most cases, due to filterting them out + # of the job event queryset returned to the user. + res = self.event_data.get('res', {}) + # Fix for existing records before we had added the workaround on save + # to change async_ok to async_failed. + if self.event == 'runner_on_async_ok': + try: + if res.get('failed', False) or res.get('rc', 0) != 0: + msg = 'Host Async Failed' + except (AttributeError, TypeError): + pass + # Runner events with ansible_job_id are part of async starting/polling. if self.event in ('runner_on_ok', 'runner_on_failed'): - res = self.event_data.get('res', {}) try: module_name = res['invocation']['module_name'] job_id = res['ansible_job_id'] @@ -1154,9 +1165,14 @@ class JobEvent(models.Model): msg = 'Host Async Checking' else: msg = 'Host Async Started' - #msg = '%s %s %s' % (msg, module_name, job_id) - #msg = '%s [%s]' % (msg, self.event) - + # Handle both 1.2 on_failed and 1.3+ on_async_failed events when an + # async task times out. + if self.event in ('runner_on_failed', 'runner_on_async_failed'): + try: + if res['msg'] == 'timed out': + msg = 'Host Async Timeout' + except (TypeError, KeyError, AttributeError): + pass return msg def _find_parent(self): @@ -1187,9 +1203,18 @@ class JobEvent(models.Model): return None def save(self, *args, **kwargs): + res = self.event_data.get('res', None) + # Workaround for Ansible 1.2, where the runner_on_async_ok event is + # created even when the async task failed. Change the event to be + # correct. + if self.event == 'runner_on_async_ok': + try: + if res.get('failed', False) or res.get('rc', 0) != 0: + self.event = 'runner_on_async_failed' + except (AttributeError, TypeError): + pass if self.event in self.FAILED_EVENTS: self.failed = True - res = self.event_data.get('res', None) if isinstance(res, dict) and res.get('changed', False): self.changed = True if self.event == 'playbook_on_stats': diff --git a/awx/main/tests/tasks.py b/awx/main/tests/tasks.py index 0974ef21fe..3b21108ad4 100644 --- a/awx/main/tests/tasks.py +++ b/awx/main/tests/tasks.py @@ -26,6 +26,46 @@ TEST_PLAYBOOK2 = '''- hosts: test-group command: test 1 = 0 ''' +TEST_ASYNC_OK_PLAYBOOK = ''' +- hosts: test-group + gather_facts: false + tasks: + - name: async task should pass + command: sleep 4 + async: 8 + poll: 1 +''' + +TEST_ASYNC_FAIL_PLAYBOOK = ''' +- hosts: test-group + gather_facts: false + tasks: + - name: async task should fail + shell: sleep 6; test 1 = 0 + async: 8 + poll: 1 +''' + +TEST_ASYNC_TIMEOUT_PLAYBOOK = ''' +- hosts: test-group + gather_facts: false + tasks: + - name: async task should timeout + command: sleep 12 + async: 8 + poll: 1 +''' + +TEST_ASYNC_NOWAIT_PLAYBOOK = ''' +- hosts: test-group + gather_facts: false + tasks: + - name: async task should run in background + command: sleep 4 + async: 8 + poll: 0 +''' + TEST_SSH_KEY_DATA = '''-----BEGIN RSA PRIVATE KEY----- MIIEpQIBAAKCAQEAyQ8F5bbgjHvk4SZJsKI9OmJKMFxZqRhvx4LaqjLTKbBwRBsY 1/C00NPiZn70dKbeyV7RNVZxuzM6yd3D3lwTdbDu/eJ0x72t3ch+TdLt/aenyy10 @@ -215,11 +255,21 @@ class RunJobTest(BaseCeleryTest): 'expected no traceback, got:\n%s' % job.result_traceback) - def check_job_events(self, job, runner_status='ok', plays=1, tasks=1): + def check_job_events(self, job, runner_status='ok', plays=1, tasks=1, + async=False, async_timeout=False, async_nowait=False): job_events = job.job_events.all() + if False and async: + print + qs = self.super_django_user.get_queryset(JobEvent) + for je in qs.filter(job=job): + print je.get_event_display2() + print je.event, je, je.failed + print je.event_data + print for job_event in job_events: unicode(job_event) # For test coverage. job_event.save() + job_event.get_event_display2() should_be_failed = bool(runner_status not in ('ok', 'skipped')) should_be_changed = bool(runner_status in ('ok', 'failed') and job.job_type == 'run') @@ -231,7 +281,8 @@ class RunJobTest(BaseCeleryTest): self.assertFalse(evt.play, evt) self.assertFalse(evt.task, evt) self.assertEqual(evt.failed, should_be_failed) - self.assertEqual(evt.changed, should_be_changed) + if not async: + self.assertEqual(evt.changed, should_be_changed) self.assertEqual(set(evt.hosts.values_list('pk', flat=True)), host_pks) qs = job_events.filter(event='playbook_on_play_start') @@ -241,7 +292,8 @@ class RunJobTest(BaseCeleryTest): self.assertTrue(evt.play, evt) self.assertFalse(evt.task, evt) self.assertEqual(evt.failed, should_be_failed) - self.assertEqual(evt.changed, should_be_changed) + if not async: + self.assertEqual(evt.changed, should_be_changed) self.assertEqual(set(evt.hosts.values_list('pk', flat=True)), host_pks) qs = job_events.filter(event='playbook_on_task_start') @@ -251,21 +303,56 @@ class RunJobTest(BaseCeleryTest): self.assertTrue(evt.play, evt) self.assertTrue(evt.task, evt) self.assertEqual(evt.failed, should_be_failed) - self.assertEqual(evt.changed, should_be_changed) + if not async: + self.assertEqual(evt.changed, should_be_changed) self.assertEqual(set(evt.hosts.values_list('pk', flat=True)), host_pks) qs = job_events.filter(event=('runner_on_%s' % runner_status)) - self.assertEqual(qs.count(), tasks) + if async and async_timeout: + pass + elif async: + self.assertTrue(qs.count()) + else: + self.assertEqual(qs.count(), tasks) for evt in qs: self.assertEqual(evt.host, self.host) self.assertTrue(evt.play, evt) self.assertTrue(evt.task, evt) self.assertEqual(evt.failed, should_be_failed) - self.assertEqual(evt.changed, should_be_changed) + if not async: + self.assertEqual(evt.changed, should_be_changed) self.assertEqual(set(evt.hosts.values_list('pk', flat=True)), host_pks) + if async: + qs = job_events.filter(event='runner_on_async_poll') + if not async_nowait: + self.assertTrue(qs.count()) + for evt in qs: + self.assertEqual(evt.host, self.host) + self.assertTrue(evt.play, evt) + self.assertTrue(evt.task, evt) + self.assertEqual(evt.failed, False)#should_be_failed) + self.assertEqual(set(evt.hosts.values_list('pk', flat=True)), + host_pks) + qs = job_events.filter(event=('runner_on_async_%s' % runner_status)) + # Ansible 1.2 won't call the on_runner_async_failed callback when a + # timeout occurs, so skip this check for now. + if not async_timeout and not async_nowait: + self.assertEqual(qs.count(), tasks) + for evt in qs: + self.assertEqual(evt.host, self.host) + self.assertTrue(evt.play, evt) + self.assertTrue(evt.task, evt) + self.assertEqual(evt.failed, should_be_failed) + self.assertEqual(set(evt.hosts.values_list('pk', flat=True)), + host_pks) qs = job_events.filter(event__startswith='runner_') qs = qs.exclude(event=('runner_on_%s' % runner_status)) + if async: + if runner_status == 'failed': + qs = qs.exclude(event='runner_on_ok') + qs = qs.exclude(event='runner_on_async_poll') + qs = qs.exclude(event=('runner_on_async_%s' % runner_status)) self.assertEqual(qs.count(), 0) def test_run_job(self): @@ -643,3 +730,116 @@ class RunJobTest(BaseCeleryTest): self.check_job_result(job, 'successful') self.assertTrue('ssh-agent' in self.run_job_args) self.assertTrue('Bad passphrase' not in job.result_stdout) + + def test_run_async_job(self): + self.create_test_project(TEST_ASYNC_OK_PLAYBOOK) + job_template = self.create_test_job_template() + job = self.create_test_job(job_template=job_template) + self.assertEqual(job.status, 'new') + self.assertFalse(job.get_passwords_needed_to_start()) + self.assertTrue(job.start()) + self.assertEqual(job.status, 'pending') + job = Job.objects.get(pk=job.pk) + self.check_job_result(job, 'successful') + self.check_job_events(job, 'ok', 1, 1, async=True) + for job_host_summary in job.job_host_summaries.all(): + self.assertFalse(job_host_summary.failed) + self.assertEqual(job_host_summary.host.last_job_host_summary, + job_host_summary) + self.host = Host.objects.get(pk=self.host.pk) + self.assertEqual(self.host.last_job, job) + self.assertFalse(self.host.has_active_failures) + for group in self.host.all_groups: + self.assertFalse(group.has_active_failures) + self.assertFalse(self.host.inventory.has_active_failures) + self.assertEqual(job.successful_hosts.count(), 1) + self.assertEqual(job.failed_hosts.count(), 0) + self.assertEqual(job.changed_hosts.count(), 1) + self.assertEqual(job.unreachable_hosts.count(), 0) + self.assertEqual(job.skipped_hosts.count(), 0) + self.assertEqual(job.processed_hosts.count(), 1) + + def test_run_async_job_that_fails(self): + self.create_test_project(TEST_ASYNC_FAIL_PLAYBOOK) + job_template = self.create_test_job_template() + job = self.create_test_job(job_template=job_template) + self.assertEqual(job.status, 'new') + self.assertFalse(job.get_passwords_needed_to_start()) + self.assertTrue(job.start()) + self.assertEqual(job.status, 'pending') + job = Job.objects.get(pk=job.pk) + self.check_job_result(job, 'failed') + self.check_job_events(job, 'failed', 1, 1, async=True) + for job_host_summary in job.job_host_summaries.all(): + self.assertTrue(job_host_summary.failed) + self.assertEqual(job_host_summary.host.last_job_host_summary, + job_host_summary) + self.host = Host.objects.get(pk=self.host.pk) + self.assertEqual(self.host.last_job, job) + self.assertTrue(self.host.has_active_failures) + for group in self.host.all_groups: + self.assertTrue(group.has_active_failures) + self.assertTrue(self.host.inventory.has_active_failures) + self.assertEqual(job.successful_hosts.count(), 1) # FIXME: Is this right? + self.assertEqual(job.failed_hosts.count(), 1) + self.assertEqual(job.changed_hosts.count(), 0) + self.assertEqual(job.unreachable_hosts.count(), 0) + self.assertEqual(job.skipped_hosts.count(), 0) + self.assertEqual(job.processed_hosts.count(), 1) + + def test_run_async_job_that_times_out(self): + self.create_test_project(TEST_ASYNC_TIMEOUT_PLAYBOOK) + job_template = self.create_test_job_template() + job = self.create_test_job(job_template=job_template) + self.assertEqual(job.status, 'new') + self.assertFalse(job.get_passwords_needed_to_start()) + self.assertTrue(job.start()) + self.assertEqual(job.status, 'pending') + job = Job.objects.get(pk=job.pk) + self.check_job_result(job, 'failed') + self.check_job_events(job, 'failed', 1, 1, async=True, + async_timeout=True) + for job_host_summary in job.job_host_summaries.all(): + self.assertTrue(job_host_summary.failed) + self.assertEqual(job_host_summary.host.last_job_host_summary, + job_host_summary) + self.host = Host.objects.get(pk=self.host.pk) + self.assertEqual(self.host.last_job, job) + self.assertTrue(self.host.has_active_failures) + for group in self.host.all_groups: + self.assertTrue(group.has_active_failures) + self.assertTrue(self.host.inventory.has_active_failures) + self.assertEqual(job.successful_hosts.count(), 1) # FIXME: Is this right? + self.assertEqual(job.failed_hosts.count(), 1) + self.assertEqual(job.changed_hosts.count(), 0) + self.assertEqual(job.unreachable_hosts.count(), 0) + self.assertEqual(job.skipped_hosts.count(), 0) + self.assertEqual(job.processed_hosts.count(), 1) + + def test_run_async_job_fire_and_forget(self): + self.create_test_project(TEST_ASYNC_NOWAIT_PLAYBOOK) + job_template = self.create_test_job_template() + job = self.create_test_job(job_template=job_template) + self.assertEqual(job.status, 'new') + self.assertFalse(job.get_passwords_needed_to_start()) + self.assertTrue(job.start()) + self.assertEqual(job.status, 'pending') + job = Job.objects.get(pk=job.pk) + self.check_job_result(job, 'successful') + self.check_job_events(job, 'ok', 1, 1, async=True, async_nowait=True) + for job_host_summary in job.job_host_summaries.all(): + self.assertFalse(job_host_summary.failed) + self.assertEqual(job_host_summary.host.last_job_host_summary, + job_host_summary) + self.host = Host.objects.get(pk=self.host.pk) + self.assertEqual(self.host.last_job, job) + self.assertFalse(self.host.has_active_failures) + for group in self.host.all_groups: + self.assertFalse(group.has_active_failures) + self.assertFalse(self.host.inventory.has_active_failures) + self.assertEqual(job.successful_hosts.count(), 1) + self.assertEqual(job.failed_hosts.count(), 0) + self.assertEqual(job.changed_hosts.count(), 0) + self.assertEqual(job.unreachable_hosts.count(), 0) + self.assertEqual(job.skipped_hosts.count(), 0) + self.assertEqual(job.processed_hosts.count(), 1)