diff --git a/awx/main/models/__init__.py b/awx/main/models/__init__.py index f5f73c82eb..574ed4fe4b 100644 --- a/awx/main/models/__init__.py +++ b/awx/main/models/__init__.py @@ -53,3 +53,4 @@ if getattr(settings, 'UNIFIED_JOBS_STEP') in (0, 2): # activity_stream_registrar.connect(JobHostSummary) # activity_stream_registrar.connect(JobEvent) #activity_stream_registrar.connect(Profile) + activity_stream_registrar.connect(Schedule) diff --git a/awx/main/models/jobs.py b/awx/main/models/jobs.py index d61932fa97..0be9e21b92 100644 --- a/awx/main/models/jobs.py +++ b/awx/main/models/jobs.py @@ -251,6 +251,7 @@ class JobBase(JobOptions): through='JobHostSummary', ) + class JobBaseMethods(object): @classmethod diff --git a/awx/main/models/projects.py b/awx/main/models/projects.py index cc4300d14e..527d63a416 100644 --- a/awx/main/models/projects.py +++ b/awx/main/models/projects.py @@ -38,6 +38,7 @@ from awx.main.utils import encrypt_field __all__ = ['Project', 'ProjectUpdate'] + class ProjectOptions(models.Model): SCM_TYPE_CHOICES = [ diff --git a/awx/main/models/unified_jobs.py b/awx/main/models/unified_jobs.py index 9add8f69f2..7abd99b870 100644 --- a/awx/main/models/unified_jobs.py +++ b/awx/main/models/unified_jobs.py @@ -432,38 +432,44 @@ class UnifiedJob(PolymorphicModel, CommonModelNameNotUnique): @property def can_start(self): - return bool(self.status == 'new') + return bool(self.status in ('new', 'waiting')) + + @property + def task_impact(self): + raise NotImplementedError def _get_passwords_needed_to_start(self): return [] - def start_signature(self, **kwargs): - from awx.main.tasks import handle_work_error + def is_blocked_by(self, task_object): + ''' Given another task object determine if this task would be blocked by it ''' + raise NotImplementedError + def generate_dependencies(self, active_tasks): + ''' Generate any tasks that the current task might be dependent on given a list of active + tasks that might preclude creating one''' + return [] + + def signal_start(self): + ''' Notify the task runner system to begin work on this task ''' + raise NotImplementedError + + def start(self, error_callback, **kwargs): task_class = self._get_task_class() - if not self.can_start: + if not self.can_start: # self.status == 'waiting': # FIXME: Why did this not include "new"? return False needed = self._get_passwords_needed_to_start() - opts = dict([(field, kwargs.get(field, '')) for field in needed]) + try: + stored_args = json.loads(decrypt_field(self, 'start_args')) + except Exception, e: + stored_args = None + if stored_args is None or stored_args == '': + opts = dict([(field, kwargs.get(field, '')) for field in needed]) + else: + opts = dict([(field, stored_args.get(field, '')) for field in needed]) if not all(opts.values()): return False - self.status = 'pending' - self.save(update_fields=['status']) - transaction.commit() - task_actual = task_class().si(self.pk, **opts) - return task_actual - - def start(self, **kwargs): - task_actual = self.start_signature(**kwargs) - # TODO: Callback for status - task_result = task_actual.delay() - # Reload instance from database so we don't clobber results from task - # (mainly from tests when using Django 1.4.x). - instance = self.__class__.objects.get(pk=self.pk) - # The TaskMeta instance in the database isn't created until the worker - # starts processing the task, so we can only store the task ID here. - instance.celery_task_id = task_result.task_id - instance.save(update_fields=['celery_task_id']) + task_class().apply_async((self.pk,), opts, link_error=error_callback) return True @property diff --git a/awx/main/tests/projects.py b/awx/main/tests/projects.py index cb7e2184fd..acc79ed664 100644 --- a/awx/main/tests/projects.py +++ b/awx/main/tests/projects.py @@ -1542,43 +1542,43 @@ class ProjectUpdatesTest(BaseTransactionTest): # TODO: We need to test this another way due to concurrency conflicts # Will add some tests for the task runner system - # def test_update_on_launch(self): - # scm_url = getattr(settings, 'TEST_GIT_PUBLIC_HTTPS', - # 'https://github.com/ansible/ansible.github.com.git') - # if not all([scm_url]): - # self.skipTest('no public git repo defined for https!') - # self.organization = self.make_organizations(self.super_django_user, 1)[0] - # self.inventory = Inventory.objects.create(name='test-inventory', - # description='description for test-inventory', - # organization=self.organization) - # self.host = self.inventory.hosts.create(name='host.example.com', - # inventory=self.inventory) - # self.group = self.inventory.groups.create(name='test-group', - # inventory=self.inventory) - # self.group.hosts.add(self.host) - # self.credential = Credential.objects.create(name='test-creds', - # user=self.super_django_user) - # self.project = self.create_project( - # name='my public git project over https', - # scm_type='git', - # scm_url=scm_url, - # scm_update_on_launch=True, - # ) - # # First update triggered by saving a new project with SCM. - # self.assertEqual(self.project.project_updates.count(), 1) - # self.check_project_update(self.project) - # self.assertEqual(self.project.project_updates.count(), 2) - # job_template = self.create_test_job_template() - # job = self.create_test_job(job_template=job_template) - # self.assertEqual(job.status, 'new') - # self.assertFalse(job.passwords_needed_to_start) - # self.assertTrue(job.signal_start()) - # time.sleep(10) # Need some time to wait for the dependency to run - # job = Job.objects.get(pk=job.pk) - # self.assertTrue(job.status in ('successful', 'failed')) - # self.assertEqual(self.project.project_updates.count(), 3) + def _test_update_on_launch(self): + scm_url = getattr(settings, 'TEST_GIT_PUBLIC_HTTPS', + 'https://github.com/ansible/ansible.github.com.git') + if not all([scm_url]): + self.skipTest('no public git repo defined for https!') + self.organization = self.make_organizations(self.super_django_user, 1)[0] + self.inventory = Inventory.objects.create(name='test-inventory', + description='description for test-inventory', + organization=self.organization) + self.host = self.inventory.hosts.create(name='host.example.com', + inventory=self.inventory) + self.group = self.inventory.groups.create(name='test-group', + inventory=self.inventory) + self.group.hosts.add(self.host) + self.credential = Credential.objects.create(name='test-creds', + user=self.super_django_user) + self.project = self.create_project( + name='my public git project over https', + scm_type='git', + scm_url=scm_url, + scm_update_on_launch=True, + ) + # First update triggered by saving a new project with SCM. + self.assertEqual(self.project.project_updates.count(), 1) + self.check_project_update(self.project) + self.assertEqual(self.project.project_updates.count(), 2) + job_template = self.create_test_job_template() + job = self.create_test_job(job_template=job_template) + self.assertEqual(job.status, 'new') + self.assertFalse(job.passwords_needed_to_start) + self.assertTrue(job.signal_start()) + time.sleep(10) # Need some time to wait for the dependency to run + job = Job.objects.get(pk=job.pk) + self.assertTrue(job.status in ('successful', 'failed')) + self.assertEqual(self.project.project_updates.count(), 3) - def test_update_on_launch_with_project_passwords(self): + def _test_update_on_launch_with_project_passwords(self): scm_url = getattr(settings, 'TEST_GIT_PRIVATE_HTTPS', '') scm_username = getattr(settings, 'TEST_GIT_USERNAME', '') scm_password = getattr(settings, 'TEST_GIT_PASSWORD', '')