mirror of
https://github.com/ansible/awx.git
synced 2024-11-04 12:51:18 +03:00
Work on AC-205. Filter job events that are internal as a result of async polling, update event display name for various async job events.
This commit is contained in:
parent
77c2449276
commit
efbdb67c33
@ -894,13 +894,18 @@ class JobEventAccess(BaseAccess):
|
||||
|
||||
def get_queryset(self):
|
||||
qs = self.model.objects.distinct()
|
||||
|
||||
# Filter certain "internal" events generating by async polling.
|
||||
qs = qs.exclude(event__in=('runner_on_ok', 'runner_on_failed'),
|
||||
event_data__icontains='"ansible_job_id": "',
|
||||
event_data__contains='"module_name": "async_status"')
|
||||
|
||||
if self.user.is_superuser:
|
||||
return qs
|
||||
job_qs = self.user.get_queryset(Job)
|
||||
host_qs = self.user.get_queryset(Host)
|
||||
qs = qs.filter(Q(host__isnull=True) | Q(host__in=host_qs),
|
||||
job__in=job_qs)
|
||||
# FIXME: Filter certain extra events from async polling?
|
||||
return qs
|
||||
|
||||
def can_add(self, data):
|
||||
|
@ -1032,8 +1032,8 @@ class JobEvent(models.Model):
|
||||
(3, 'runner_on_unreachable', _('Host Unreachable'), True),
|
||||
(3, 'runner_on_no_hosts', _('No Hosts Remaining'), False),
|
||||
(3, 'runner_on_async_poll', _('Host Polling'), False),
|
||||
(3, 'runner_on_async_ok', _('Host OK'), False),
|
||||
(3, 'runner_on_async_failed', _('Host Failure'), True),
|
||||
(3, 'runner_on_async_ok', _('Host Async OK'), False),
|
||||
(3, 'runner_on_async_failed', _('Host Async Failure'), True),
|
||||
# AWX does not yet support --diff mode
|
||||
(3, 'runner_on_file_diff', _('File Difference'), False),
|
||||
(0, 'playbook_on_start', _('Playbook Started'), False),
|
||||
@ -1140,9 +1140,20 @@ class JobEvent(models.Model):
|
||||
if self.task is not None:
|
||||
msg = "%s (%s)" % (msg, self.task)
|
||||
|
||||
# Change display for runner events trigged by async polling.
|
||||
# Change display for runner events trigged by async polling. Some of
|
||||
# these events may not show in most cases, due to filterting them out
|
||||
# of the job event queryset returned to the user.
|
||||
res = self.event_data.get('res', {})
|
||||
# Fix for existing records before we had added the workaround on save
|
||||
# to change async_ok to async_failed.
|
||||
if self.event == 'runner_on_async_ok':
|
||||
try:
|
||||
if res.get('failed', False) or res.get('rc', 0) != 0:
|
||||
msg = 'Host Async Failed'
|
||||
except (AttributeError, TypeError):
|
||||
pass
|
||||
# Runner events with ansible_job_id are part of async starting/polling.
|
||||
if self.event in ('runner_on_ok', 'runner_on_failed'):
|
||||
res = self.event_data.get('res', {})
|
||||
try:
|
||||
module_name = res['invocation']['module_name']
|
||||
job_id = res['ansible_job_id']
|
||||
@ -1154,9 +1165,14 @@ class JobEvent(models.Model):
|
||||
msg = 'Host Async Checking'
|
||||
else:
|
||||
msg = 'Host Async Started'
|
||||
#msg = '%s %s %s' % (msg, module_name, job_id)
|
||||
#msg = '%s [%s]' % (msg, self.event)
|
||||
|
||||
# Handle both 1.2 on_failed and 1.3+ on_async_failed events when an
|
||||
# async task times out.
|
||||
if self.event in ('runner_on_failed', 'runner_on_async_failed'):
|
||||
try:
|
||||
if res['msg'] == 'timed out':
|
||||
msg = 'Host Async Timeout'
|
||||
except (TypeError, KeyError, AttributeError):
|
||||
pass
|
||||
return msg
|
||||
|
||||
def _find_parent(self):
|
||||
@ -1187,9 +1203,18 @@ class JobEvent(models.Model):
|
||||
return None
|
||||
|
||||
def save(self, *args, **kwargs):
|
||||
res = self.event_data.get('res', None)
|
||||
# Workaround for Ansible 1.2, where the runner_on_async_ok event is
|
||||
# created even when the async task failed. Change the event to be
|
||||
# correct.
|
||||
if self.event == 'runner_on_async_ok':
|
||||
try:
|
||||
if res.get('failed', False) or res.get('rc', 0) != 0:
|
||||
self.event = 'runner_on_async_failed'
|
||||
except (AttributeError, TypeError):
|
||||
pass
|
||||
if self.event in self.FAILED_EVENTS:
|
||||
self.failed = True
|
||||
res = self.event_data.get('res', None)
|
||||
if isinstance(res, dict) and res.get('changed', False):
|
||||
self.changed = True
|
||||
if self.event == 'playbook_on_stats':
|
||||
|
@ -26,6 +26,46 @@ TEST_PLAYBOOK2 = '''- hosts: test-group
|
||||
command: test 1 = 0
|
||||
'''
|
||||
|
||||
TEST_ASYNC_OK_PLAYBOOK = '''
|
||||
- hosts: test-group
|
||||
gather_facts: false
|
||||
tasks:
|
||||
- name: async task should pass
|
||||
command: sleep 4
|
||||
async: 8
|
||||
poll: 1
|
||||
'''
|
||||
|
||||
TEST_ASYNC_FAIL_PLAYBOOK = '''
|
||||
- hosts: test-group
|
||||
gather_facts: false
|
||||
tasks:
|
||||
- name: async task should fail
|
||||
shell: sleep 6; test 1 = 0
|
||||
async: 8
|
||||
poll: 1
|
||||
'''
|
||||
|
||||
TEST_ASYNC_TIMEOUT_PLAYBOOK = '''
|
||||
- hosts: test-group
|
||||
gather_facts: false
|
||||
tasks:
|
||||
- name: async task should timeout
|
||||
command: sleep 12
|
||||
async: 8
|
||||
poll: 1
|
||||
'''
|
||||
|
||||
TEST_ASYNC_NOWAIT_PLAYBOOK = '''
|
||||
- hosts: test-group
|
||||
gather_facts: false
|
||||
tasks:
|
||||
- name: async task should run in background
|
||||
command: sleep 4
|
||||
async: 8
|
||||
poll: 0
|
||||
'''
|
||||
|
||||
TEST_SSH_KEY_DATA = '''-----BEGIN RSA PRIVATE KEY-----
|
||||
MIIEpQIBAAKCAQEAyQ8F5bbgjHvk4SZJsKI9OmJKMFxZqRhvx4LaqjLTKbBwRBsY
|
||||
1/C00NPiZn70dKbeyV7RNVZxuzM6yd3D3lwTdbDu/eJ0x72t3ch+TdLt/aenyy10
|
||||
@ -215,11 +255,21 @@ class RunJobTest(BaseCeleryTest):
|
||||
'expected no traceback, got:\n%s' %
|
||||
job.result_traceback)
|
||||
|
||||
def check_job_events(self, job, runner_status='ok', plays=1, tasks=1):
|
||||
def check_job_events(self, job, runner_status='ok', plays=1, tasks=1,
|
||||
async=False, async_timeout=False, async_nowait=False):
|
||||
job_events = job.job_events.all()
|
||||
if False and async:
|
||||
print
|
||||
qs = self.super_django_user.get_queryset(JobEvent)
|
||||
for je in qs.filter(job=job):
|
||||
print je.get_event_display2()
|
||||
print je.event, je, je.failed
|
||||
print je.event_data
|
||||
print
|
||||
for job_event in job_events:
|
||||
unicode(job_event) # For test coverage.
|
||||
job_event.save()
|
||||
job_event.get_event_display2()
|
||||
should_be_failed = bool(runner_status not in ('ok', 'skipped'))
|
||||
should_be_changed = bool(runner_status in ('ok', 'failed') and
|
||||
job.job_type == 'run')
|
||||
@ -231,7 +281,8 @@ class RunJobTest(BaseCeleryTest):
|
||||
self.assertFalse(evt.play, evt)
|
||||
self.assertFalse(evt.task, evt)
|
||||
self.assertEqual(evt.failed, should_be_failed)
|
||||
self.assertEqual(evt.changed, should_be_changed)
|
||||
if not async:
|
||||
self.assertEqual(evt.changed, should_be_changed)
|
||||
self.assertEqual(set(evt.hosts.values_list('pk', flat=True)),
|
||||
host_pks)
|
||||
qs = job_events.filter(event='playbook_on_play_start')
|
||||
@ -241,7 +292,8 @@ class RunJobTest(BaseCeleryTest):
|
||||
self.assertTrue(evt.play, evt)
|
||||
self.assertFalse(evt.task, evt)
|
||||
self.assertEqual(evt.failed, should_be_failed)
|
||||
self.assertEqual(evt.changed, should_be_changed)
|
||||
if not async:
|
||||
self.assertEqual(evt.changed, should_be_changed)
|
||||
self.assertEqual(set(evt.hosts.values_list('pk', flat=True)),
|
||||
host_pks)
|
||||
qs = job_events.filter(event='playbook_on_task_start')
|
||||
@ -251,21 +303,56 @@ class RunJobTest(BaseCeleryTest):
|
||||
self.assertTrue(evt.play, evt)
|
||||
self.assertTrue(evt.task, evt)
|
||||
self.assertEqual(evt.failed, should_be_failed)
|
||||
self.assertEqual(evt.changed, should_be_changed)
|
||||
if not async:
|
||||
self.assertEqual(evt.changed, should_be_changed)
|
||||
self.assertEqual(set(evt.hosts.values_list('pk', flat=True)),
|
||||
host_pks)
|
||||
qs = job_events.filter(event=('runner_on_%s' % runner_status))
|
||||
self.assertEqual(qs.count(), tasks)
|
||||
if async and async_timeout:
|
||||
pass
|
||||
elif async:
|
||||
self.assertTrue(qs.count())
|
||||
else:
|
||||
self.assertEqual(qs.count(), tasks)
|
||||
for evt in qs:
|
||||
self.assertEqual(evt.host, self.host)
|
||||
self.assertTrue(evt.play, evt)
|
||||
self.assertTrue(evt.task, evt)
|
||||
self.assertEqual(evt.failed, should_be_failed)
|
||||
self.assertEqual(evt.changed, should_be_changed)
|
||||
if not async:
|
||||
self.assertEqual(evt.changed, should_be_changed)
|
||||
self.assertEqual(set(evt.hosts.values_list('pk', flat=True)),
|
||||
host_pks)
|
||||
if async:
|
||||
qs = job_events.filter(event='runner_on_async_poll')
|
||||
if not async_nowait:
|
||||
self.assertTrue(qs.count())
|
||||
for evt in qs:
|
||||
self.assertEqual(evt.host, self.host)
|
||||
self.assertTrue(evt.play, evt)
|
||||
self.assertTrue(evt.task, evt)
|
||||
self.assertEqual(evt.failed, False)#should_be_failed)
|
||||
self.assertEqual(set(evt.hosts.values_list('pk', flat=True)),
|
||||
host_pks)
|
||||
qs = job_events.filter(event=('runner_on_async_%s' % runner_status))
|
||||
# Ansible 1.2 won't call the on_runner_async_failed callback when a
|
||||
# timeout occurs, so skip this check for now.
|
||||
if not async_timeout and not async_nowait:
|
||||
self.assertEqual(qs.count(), tasks)
|
||||
for evt in qs:
|
||||
self.assertEqual(evt.host, self.host)
|
||||
self.assertTrue(evt.play, evt)
|
||||
self.assertTrue(evt.task, evt)
|
||||
self.assertEqual(evt.failed, should_be_failed)
|
||||
self.assertEqual(set(evt.hosts.values_list('pk', flat=True)),
|
||||
host_pks)
|
||||
qs = job_events.filter(event__startswith='runner_')
|
||||
qs = qs.exclude(event=('runner_on_%s' % runner_status))
|
||||
if async:
|
||||
if runner_status == 'failed':
|
||||
qs = qs.exclude(event='runner_on_ok')
|
||||
qs = qs.exclude(event='runner_on_async_poll')
|
||||
qs = qs.exclude(event=('runner_on_async_%s' % runner_status))
|
||||
self.assertEqual(qs.count(), 0)
|
||||
|
||||
def test_run_job(self):
|
||||
@ -643,3 +730,116 @@ class RunJobTest(BaseCeleryTest):
|
||||
self.check_job_result(job, 'successful')
|
||||
self.assertTrue('ssh-agent' in self.run_job_args)
|
||||
self.assertTrue('Bad passphrase' not in job.result_stdout)
|
||||
|
||||
def test_run_async_job(self):
|
||||
self.create_test_project(TEST_ASYNC_OK_PLAYBOOK)
|
||||
job_template = self.create_test_job_template()
|
||||
job = self.create_test_job(job_template=job_template)
|
||||
self.assertEqual(job.status, 'new')
|
||||
self.assertFalse(job.get_passwords_needed_to_start())
|
||||
self.assertTrue(job.start())
|
||||
self.assertEqual(job.status, 'pending')
|
||||
job = Job.objects.get(pk=job.pk)
|
||||
self.check_job_result(job, 'successful')
|
||||
self.check_job_events(job, 'ok', 1, 1, async=True)
|
||||
for job_host_summary in job.job_host_summaries.all():
|
||||
self.assertFalse(job_host_summary.failed)
|
||||
self.assertEqual(job_host_summary.host.last_job_host_summary,
|
||||
job_host_summary)
|
||||
self.host = Host.objects.get(pk=self.host.pk)
|
||||
self.assertEqual(self.host.last_job, job)
|
||||
self.assertFalse(self.host.has_active_failures)
|
||||
for group in self.host.all_groups:
|
||||
self.assertFalse(group.has_active_failures)
|
||||
self.assertFalse(self.host.inventory.has_active_failures)
|
||||
self.assertEqual(job.successful_hosts.count(), 1)
|
||||
self.assertEqual(job.failed_hosts.count(), 0)
|
||||
self.assertEqual(job.changed_hosts.count(), 1)
|
||||
self.assertEqual(job.unreachable_hosts.count(), 0)
|
||||
self.assertEqual(job.skipped_hosts.count(), 0)
|
||||
self.assertEqual(job.processed_hosts.count(), 1)
|
||||
|
||||
def test_run_async_job_that_fails(self):
|
||||
self.create_test_project(TEST_ASYNC_FAIL_PLAYBOOK)
|
||||
job_template = self.create_test_job_template()
|
||||
job = self.create_test_job(job_template=job_template)
|
||||
self.assertEqual(job.status, 'new')
|
||||
self.assertFalse(job.get_passwords_needed_to_start())
|
||||
self.assertTrue(job.start())
|
||||
self.assertEqual(job.status, 'pending')
|
||||
job = Job.objects.get(pk=job.pk)
|
||||
self.check_job_result(job, 'failed')
|
||||
self.check_job_events(job, 'failed', 1, 1, async=True)
|
||||
for job_host_summary in job.job_host_summaries.all():
|
||||
self.assertTrue(job_host_summary.failed)
|
||||
self.assertEqual(job_host_summary.host.last_job_host_summary,
|
||||
job_host_summary)
|
||||
self.host = Host.objects.get(pk=self.host.pk)
|
||||
self.assertEqual(self.host.last_job, job)
|
||||
self.assertTrue(self.host.has_active_failures)
|
||||
for group in self.host.all_groups:
|
||||
self.assertTrue(group.has_active_failures)
|
||||
self.assertTrue(self.host.inventory.has_active_failures)
|
||||
self.assertEqual(job.successful_hosts.count(), 1) # FIXME: Is this right?
|
||||
self.assertEqual(job.failed_hosts.count(), 1)
|
||||
self.assertEqual(job.changed_hosts.count(), 0)
|
||||
self.assertEqual(job.unreachable_hosts.count(), 0)
|
||||
self.assertEqual(job.skipped_hosts.count(), 0)
|
||||
self.assertEqual(job.processed_hosts.count(), 1)
|
||||
|
||||
def test_run_async_job_that_times_out(self):
|
||||
self.create_test_project(TEST_ASYNC_TIMEOUT_PLAYBOOK)
|
||||
job_template = self.create_test_job_template()
|
||||
job = self.create_test_job(job_template=job_template)
|
||||
self.assertEqual(job.status, 'new')
|
||||
self.assertFalse(job.get_passwords_needed_to_start())
|
||||
self.assertTrue(job.start())
|
||||
self.assertEqual(job.status, 'pending')
|
||||
job = Job.objects.get(pk=job.pk)
|
||||
self.check_job_result(job, 'failed')
|
||||
self.check_job_events(job, 'failed', 1, 1, async=True,
|
||||
async_timeout=True)
|
||||
for job_host_summary in job.job_host_summaries.all():
|
||||
self.assertTrue(job_host_summary.failed)
|
||||
self.assertEqual(job_host_summary.host.last_job_host_summary,
|
||||
job_host_summary)
|
||||
self.host = Host.objects.get(pk=self.host.pk)
|
||||
self.assertEqual(self.host.last_job, job)
|
||||
self.assertTrue(self.host.has_active_failures)
|
||||
for group in self.host.all_groups:
|
||||
self.assertTrue(group.has_active_failures)
|
||||
self.assertTrue(self.host.inventory.has_active_failures)
|
||||
self.assertEqual(job.successful_hosts.count(), 1) # FIXME: Is this right?
|
||||
self.assertEqual(job.failed_hosts.count(), 1)
|
||||
self.assertEqual(job.changed_hosts.count(), 0)
|
||||
self.assertEqual(job.unreachable_hosts.count(), 0)
|
||||
self.assertEqual(job.skipped_hosts.count(), 0)
|
||||
self.assertEqual(job.processed_hosts.count(), 1)
|
||||
|
||||
def test_run_async_job_fire_and_forget(self):
|
||||
self.create_test_project(TEST_ASYNC_NOWAIT_PLAYBOOK)
|
||||
job_template = self.create_test_job_template()
|
||||
job = self.create_test_job(job_template=job_template)
|
||||
self.assertEqual(job.status, 'new')
|
||||
self.assertFalse(job.get_passwords_needed_to_start())
|
||||
self.assertTrue(job.start())
|
||||
self.assertEqual(job.status, 'pending')
|
||||
job = Job.objects.get(pk=job.pk)
|
||||
self.check_job_result(job, 'successful')
|
||||
self.check_job_events(job, 'ok', 1, 1, async=True, async_nowait=True)
|
||||
for job_host_summary in job.job_host_summaries.all():
|
||||
self.assertFalse(job_host_summary.failed)
|
||||
self.assertEqual(job_host_summary.host.last_job_host_summary,
|
||||
job_host_summary)
|
||||
self.host = Host.objects.get(pk=self.host.pk)
|
||||
self.assertEqual(self.host.last_job, job)
|
||||
self.assertFalse(self.host.has_active_failures)
|
||||
for group in self.host.all_groups:
|
||||
self.assertFalse(group.has_active_failures)
|
||||
self.assertFalse(self.host.inventory.has_active_failures)
|
||||
self.assertEqual(job.successful_hosts.count(), 1)
|
||||
self.assertEqual(job.failed_hosts.count(), 0)
|
||||
self.assertEqual(job.changed_hosts.count(), 0)
|
||||
self.assertEqual(job.unreachable_hosts.count(), 0)
|
||||
self.assertEqual(job.skipped_hosts.count(), 0)
|
||||
self.assertEqual(job.processed_hosts.count(), 1)
|
||||
|
Loading…
Reference in New Issue
Block a user