diff --git a/awx/main/models/jobs.py b/awx/main/models/jobs.py index c57d32968c..64a002f062 100644 --- a/awx/main/models/jobs.py +++ b/awx/main/models/jobs.py @@ -377,7 +377,11 @@ class JobHostSummary(BaseModel): return reverse('api:job_host_summary_detail', args=(self.pk,)) def save(self, *args, **kwargs): + # If update_fields has been specified, add our field names to it, + # if it hasn't been specified, then we're just doing a normal save. + update_fields = kwargs.get('update_fields', []) self.failed = bool(self.dark or self.failures) + update_fields.append('failed') super(JobHostSummary, self).save(*args, **kwargs) self.update_host_last_job_summary() @@ -608,6 +612,9 @@ class JobEvent(BaseModel): return None def save(self, *args, **kwargs): + # If update_fields has been specified, add our field names to it, + # if it hasn't been specified, then we're just doing a normal save. + update_fields = kwargs.get('update_fields', []) res = self.event_data.get('res', None) # Workaround for Ansible 1.2, where the runner_on_async_ok event is # created even when the async task failed. Change the event to be @@ -621,26 +628,37 @@ class JobEvent(BaseModel): if self.event in self.FAILED_EVENTS: if not self.event_data.get('ignore_errors', False): self.failed = True + if 'failed' not in update_fields: + update_fields.append('failed') if isinstance(res, dict) and res.get('changed', False): self.changed = True + if 'changed' not in update_fields: + update_fields.append('changed') if self.event == 'playbook_on_stats': try: failures_dict = self.event_data.get('failures', {}) dark_dict = self.event_data.get('dark', {}) self.failed = bool(sum(failures_dict.values()) + sum(dark_dict.values())) + if 'failed' not in update_fields: + update_fields.append('failed') changed_dict = self.event_data.get('changed', {}) self.changed = bool(sum(changed_dict.values())) + if 'changed' not in update_fields: + update_fields.append('changed') except (AttributeError, TypeError): pass try: if not self.host and self.event_data.get('host', ''): self.host = self.job.inventory.hosts.get(name=self.event_data['host']) + if 'host' not in update_fields: + update_fields.append('host') except (Host.DoesNotExist, AttributeError): pass self.play = self.event_data.get('play', '') self.task = self.event_data.get('task', '') self.parent = self._find_parent() + update_fields.extend(['play', 'task', 'parent']) super(JobEvent, self).save(*args, **kwargs) self.update_parent_failed_and_changed() self.update_hosts() @@ -650,15 +668,15 @@ class JobEvent(BaseModel): # Propagage failed and changed flags to parent events. if self.parent: parent = self.parent - save_parent = False + update_fields = [] if self.failed and not parent.failed: parent.failed = True - save_parent = True + update_fields.append('failed') if self.changed and not parent.changed: parent.changed = True - save_parent = True - if save_parent: - parent.save() + update_fields.append('changed') + if update_fields: + parent.save(update_fields=update_fields) parent.update_parent_failed_and_changed() def update_hosts(self, extra_hosts=None): @@ -700,14 +718,14 @@ class JobEvent(BaseModel): except Host.DoesNotExist: continue host_summary = self.job.job_host_summaries.get_or_create(host=host)[0] - host_summary_changed = False + update_fields = [] for stat in ('changed', 'dark', 'failures', 'ok', 'processed', 'skipped'): try: value = self.event_data.get(stat, {}).get(hostname, 0) if getattr(host_summary, stat) != value: setattr(host_summary, stat, value) - host_summary_changed = True + update_fields.append(stat) except AttributeError: # in case event_data[stat] isn't a dict. pass - if host_summary_changed: - host_summary.save() + if update_fields: + host_summary.save(update_fields=update_fields) diff --git a/awx/main/signals.py b/awx/main/signals.py index 44fa6447b2..2558e4ad88 100644 --- a/awx/main/signals.py +++ b/awx/main/signals.py @@ -42,6 +42,8 @@ def update_inventory_computed_fields(sender, **kwargs): else: sender_name = unicode(sender._meta.verbose_name) if kwargs['signal'] == post_save: + if sender == Job and instance.active: + return sender_action = 'saved' elif kwargs['signal'] == post_delete: sender_action = 'deleted' @@ -101,7 +103,7 @@ def migrate_children_from_deleted_group_to_parent_groups(sender, **kwargs): @receiver(pre_save, sender=Group) def save_related_pks_before_group_marked_inactive(sender, **kwargs): instance = kwargs['instance'] - if not instance.pk: + if not instance.pk or instance.active: return instance._saved_parents_pks = set(instance.parents.values_list('pk', flat=True)) instance._saved_hosts_pks = set(instance.hosts.values_list('pk', flat=True)) @@ -154,6 +156,8 @@ def _update_host_last_jhs(host): @receiver(post_save, sender=Job) def update_host_last_job_when_job_marked_inactive(sender, **kwargs): instance = kwargs['instance'] + if instance.active: + return hosts_qs = Host.objects.filter(active=True, last_job__pk=instance.pk) for host in hosts_qs: _update_host_last_jhs(host) diff --git a/awx/main/tests/jobs.py b/awx/main/tests/jobs.py index 0018851279..41e9414b21 100644 --- a/awx/main/tests/jobs.py +++ b/awx/main/tests/jobs.py @@ -2,10 +2,12 @@ # All Rights Reserved. # Python -import datetime +import contextlib import json import socket import struct +import threading +import urlparse import uuid # Django @@ -17,12 +19,15 @@ import django.test from django.test.client import Client from django.test.utils import override_settings +# Requests +import requests + # AWX from awx.main.models import * from awx.main.tests.base import BaseTestMixin __all__ = ['JobTemplateTest', 'JobTest', 'JobStartCancelTest', - 'JobTemplateCallbackTest'] + 'JobTemplateCallbackTest', 'JobTransactionTest'] TEST_PLAYBOOK = '''- hosts: all gather_facts: false @@ -31,6 +36,16 @@ TEST_PLAYBOOK = '''- hosts: all command: test 1 = 1 ''' +TEST_ASYNC_PLAYBOOK = ''' +- hosts: all + gather_facts: false + tasks: + - name: async task should pass + command: sleep 10 + async: 20 + poll: 1 +''' + class BaseJobTestMixin(BaseTestMixin): '''''' @@ -1312,3 +1327,65 @@ class JobTemplateCallbackTest(BaseJobTestMixin, django.test.LiveServerTestCase): host = host_qs[0] host_ip = self.get_test_ips_for_host(host.name)[0] self.post(url, data, expect=400, remote_addr=host_ip) + + +@override_settings(CELERY_ALWAYS_EAGER=True, + CELERY_EAGER_PROPAGATES_EXCEPTIONS=True, + ANSIBLE_TRANSPORT='local') +class JobTransactionTest(BaseJobTestMixin, django.test.LiveServerTestCase): + '''Job test of transaction locking using the celery task backend.''' + + def setUp(self): + super(JobTransactionTest, self).setUp() + settings.INTERNAL_API_URL = self.live_server_url + + def tearDown(self): + super(JobTransactionTest, self).tearDown() + + def _job_detail_polling_thread(self, url, auth, errors): + while True: + try: + response = requests.get(url, auth=auth) + response.raise_for_status() + data = json.loads(response.content) + if data.get('status', '') not in ('new', 'pending', 'running'): + break + except Exception, e: + errors.append(e) + break + + @contextlib.contextmanager + def poll_job_detail(self, url, auth, errors): + try: + t = threading.Thread(target=self._job_detail_polling_thread, + args=(url, auth, errors)) + t.start() + yield + finally: + t.join(20) + + # FIXME: This test isn't working for now. + def _test_get_job_detail_while_job_running(self): + self.proj_async = self.make_project('async', 'async test', + self.user_sue, TEST_ASYNC_PLAYBOOK) + self.org_eng.projects.add(self.proj_async) + job = self.job_ops_east_run + job.project = self.proj_async + job.playbook = self.proj_async.playbooks[0] + job.verbosity = 3 + job.save() + + job_detail_url = reverse('api:job_detail', args=(job.pk,)) + job_detail_url = urlparse.urljoin(self.live_server_url, job_detail_url) + auth = ('sue', self._user_passwords['sue']) + errors = [] + with self.poll_job_detail(job_detail_url, auth, errors): + with self.current_user(self.user_sue): + url = reverse('api:job_start', args=(job.pk,)) + response = self.get(url) + self.assertTrue(response['can_start']) + self.assertFalse(response['passwords_needed_to_start']) + response = self.post(url, {}, expect=202) + job = Job.objects.get(pk=job.pk) + self.assertEqual(job.status, 'successful', job.result_stdout) + self.assertFalse(errors)