From 4a598d7c0ab9a296be95ef20cc9eeb0795f9d4d7 Mon Sep 17 00:00:00 2001 From: Matthew Jones Date: Mon, 27 Jan 2014 11:37:18 -0500 Subject: [PATCH] Initial work towards the celery refactor... adjusting logic to allow building a worker chain... temporarily relax requirements on status checks --- awx/main/models/base.py | 10 +++- awx/main/models/inventory.py | 6 ++ awx/main/models/jobs.py | 51 ++++++++++++++++ awx/main/models/projects.py | 6 ++ awx/main/tasks.py | 111 +++++------------------------------ awx/settings/defaults.py | 1 + 6 files changed, 87 insertions(+), 98 deletions(-) diff --git a/awx/main/models/base.py b/awx/main/models/base.py index e6653881cf..0182cee98d 100644 --- a/awx/main/models/base.py +++ b/awx/main/models/base.py @@ -366,7 +366,7 @@ class CommonTask(PrimordialModel): def _get_passwords_needed_to_start(self): return [] - def start(self, **kwargs): + def start_signature(self, **kwargs): task_class = self._get_task_class() if not self.can_start: return False @@ -377,7 +377,13 @@ class CommonTask(PrimordialModel): self.status = 'pending' self.save(update_fields=['status']) transaction.commit() - task_result = task_class().delay(self.pk, **opts) + task_actual = task_class().si(self.pk, **opts) + return task_actual + + def start(self, **kwargs): + task_actual = self.start_signature(**kwargs) + # TODO: Callback for status + task_result = task_actual.delay() # Reload instance from database so we don't clobber results from task # (mainly from tests when using Django 1.4.x). instance = self.__class__.objects.get(pk=self.pk) diff --git a/awx/main/models/inventory.py b/awx/main/models/inventory.py index dfadfd8bd4..6867fe251d 100644 --- a/awx/main/models/inventory.py +++ b/awx/main/models/inventory.py @@ -685,6 +685,12 @@ class InventorySource(PrimordialModel): # FIXME: Prevent update when another one is active! return bool(self.source) + def update_signature(self, **kwargs): + if self.can_update: + inventory_update = self.inventory_updates.create() + inventory_update_sig = inventory_update.start_signature() + return (inventory_update, inventory_update_sig) + def update(self, **kwargs): if self.can_update: inventory_update = self.inventory_updates.create() diff --git a/awx/main/models/jobs.py b/awx/main/models/jobs.py index d22099c676..02a5c77cb5 100644 --- a/awx/main/models/jobs.py +++ b/awx/main/models/jobs.py @@ -18,6 +18,7 @@ import yaml # Django from django.conf import settings from django.db import models +from django.db import transaction from django.utils.translation import ugettext_lazy as _ from django.core.exceptions import ValidationError, NON_FIELD_ERRORS from django.core.urlresolvers import reverse @@ -30,6 +31,11 @@ from jsonfield import JSONField # AWX from awx.main.models.base import * +# Celery +from celery import chain + +logger = logging.getLogger('awx.main.models.jobs') + __all__ = ['JobTemplate', 'Job', 'JobHostSummary', 'JobEvent'] @@ -328,6 +334,51 @@ class Job(CommonTask): def processed_hosts(self): return self._get_hosts(job_host_summaries__processed__gt=0) + def start(self, **kwargs): + task_class = self._get_task_class() + if not self.can_start: + return False + needed = self._get_passwords_needed_to_start() + opts = dict([(field, kwargs.get(field, '')) for field in needed]) + if not all(opts.values()): + return False + self.status = 'waiting' + self.save(update_fields=['status']) + transaction.commit() + + runnable_tasks = [] + inventory_updates_actual = [] + project_update_actual = None + + project = self.project + inventory = self.inventory + is_qs = inventory.inventory_sources.filter(active=True, update_on_launch=True) + if project.scm_update_on_launch: + # TODO: We assume these return a tuple but not on error + project_update, project_update_sig = project.update_signature() + if not project_update: + # TODO: Set error here + pass + else: + project_update_actual = project_update + # TODO: append a callback to gather the status? + runnable_tasks.append(project_update_sig) + # TODO: need to add celery task id to proj update instance + if is_qs.count(): + for inventory_source in is_qs: + # TODO: We assume these return a tuple but not on error + inventory_update, inventory_update_sig = inventory_source.update_signature() + if not inventory_update: + # TODO: Set error here + pass + else: + inventory_updates_actual.append(inventory_update) + runnable_tasks.append(inventory_update_sig) + job_actual = task_class().si(self.pk, **opts) + runnable_tasks.append(job_actual) + print runnable_tasks + res = chain(runnable_tasks)() + return True class JobHostSummary(BaseModel): ''' diff --git a/awx/main/models/projects.py b/awx/main/models/projects.py index 7a531a9d10..9738d2627f 100644 --- a/awx/main/models/projects.py +++ b/awx/main/models/projects.py @@ -282,6 +282,12 @@ class Project(CommonModel): # FIXME: Prevent update when another one is active! return bool(self.scm_type)# and not self.current_update) + def update_signature(self, **kwargs): + if self.can_update: + project_update = self.project_updates.create() + project_update_sig = project_update.start_signature() + return (project_update, project_updaate_sig) + def update(self, **kwargs): if self.can_update: project_update = self.project_updates.create() diff --git a/awx/main/tasks.py b/awx/main/tasks.py index 7b2702cea3..2b31fe8151 100644 --- a/awx/main/tasks.py +++ b/awx/main/tasks.py @@ -108,6 +108,7 @@ class BaseTask(Task): 'yes': 'yes', 'no': 'no', '': '', + } def build_env(self, instance, **kwargs): @@ -208,10 +209,12 @@ class BaseTask(Task): # we have a way to know the task is still running, otherwise the # post_run_hook below would cancel long-running tasks that are # really still active). - instance = self.update_model(instance.pk, status='running') - if instance.cancel_flag: - child.close(True) - canceled = True + #TODO: Find replacement for cancel flag + #TODO: Something about checking celery status + # instance = self.update_model(instance.pk, status='running') + # if instance.cancel_flag: + # child.close(True) + # canceled = True # FIXME: Find a way to determine if task is hung waiting at a prompt. if idle_timeout and (time.time() - last_stdout_update) > idle_timeout: child.close(True) @@ -444,98 +447,14 @@ class RunJob(BaseTask): ''' Hook for checking job before running. ''' - project_update = None - inventory_updates = None - while True: - pk = job.pk - if job.status in ('pending', 'waiting'): - project = job.project - pu_qs = project.project_updates.filter(status__in=('pending', 'running')) - inventory = job.inventory - base_iu_qs = InventoryUpdate.objects.filter(inventory_source__inventory=inventory) - iu_qs = base_iu_qs.filter(status__in=('pending', 'running')) - is_qs = inventory.inventory_sources.filter(active=True, update_on_launch=True) - # Refresh the current project_update instance (if set). - if project_update: - try: - project_update = project.project_updates.filter(pk=project_update.pk)[0] - except IndexError: - msg = 'Unable to check project update.' - job = self.update_model(pk, status='error', - result_traceback=msg) - return False - # Refresh the current inventory_update instance(s) (if set). - if inventory_updates: - inventory_update_pks = [x.pk for x in inventory_updates] - inventory_updates = list(base_iu_qs.filter(pk__in=inventory_update_pks)) - - # If the job needs to update the project first (and there is no - # specific project update defined). - if not project_update and project.scm_update_on_launch: - job = self.update_model(pk, status='waiting') - try: - project_update = pu_qs[0] - except IndexError: - project_update = project.update() - if not project_update: - msg = 'Unable to update project before launch.' - job = self.update_model(pk, status='error', - result_traceback=msg) - return False - #print 'job %d waiting on project update %d' % (pk, project_update.pk) - time.sleep(2.0) - # If the job needs to update any inventory first (and there are - # no current inventory updates pending). - elif inventory_updates is None and is_qs.count(): - job = self.update_model(pk, status='waiting') - inventory_updates = [] - msgs = [] - for inventory_source in is_qs: - try: - inventory_update = iu_qs.filter(inventory_source=inventory_source)[0] - except IndexError: - inventory_update = inventory_source.update() - if not inventory_update: - msgs.append('Unable to update inventory source %d before launch' % inventory_source.pk) - continue - inventory_updates.append(inventory_update) - if msgs: - msg = '\n'.join(msgs) - job = self.update_model(pk, status='error', - result_traceback=msg) - return False - time.sleep(2.0) - # If project update has failed, abort the job. - elif project_update and project_update.failed: - msg = 'Project update %d failed with status = %s.' % (project_update.pk, project_update.status) - job = self.update_model(pk, status='error', - result_traceback=msg) - return False - # If any inventory update has failed, abort the job. - elif inventory_updates and any([x.failed for x in inventory_updates]): - msgs = [] - for inventory_update in inventory_updates: - if inventory_update.failed: - msgs.append('Inventory update %d failed with status = %s.' % (inventory_update.pk, inventory_update.status)) - if msgs: - msg = '\n'.join(msgs) - job = self.update_model(pk, status='error', - result_traceback=msg) - return False - # Check if blocked by any other active project or inventory updates. - elif pu_qs.count() or iu_qs.count(): - #print 'job %d waiting on' % pk, pu_qs - job = self.update_model(pk, status='waiting') - time.sleep(4.0) - # Otherwise continue running the job. - else: - job = self.update_model(pk, status='pending') - return True - elif job.cancel_flag: - job = self.update_model(pk, status='canceled') - return False - else: - return False + if job.status in ('pending', 'waiting'): + job = self.update_model(job.pk, status='pending') + return True + elif job.cancel_flag: + job = self.update_model(job.pk, status='canceled') + return False + else: + return False def post_run_hook(self, job, **kwargs): ''' diff --git a/awx/settings/defaults.py b/awx/settings/defaults.py index 1498d9ef27..fca79f1480 100644 --- a/awx/settings/defaults.py +++ b/awx/settings/defaults.py @@ -282,6 +282,7 @@ CELERYD_TASK_TIME_LIMIT = None CELERYD_TASK_SOFT_TIME_LIMIT = None CELERYBEAT_SCHEDULER = 'djcelery.schedulers.DatabaseScheduler' CELERYBEAT_MAX_LOOP_INTERVAL = 60 +CELERY_RESULT_BACKEND = 'djcelery.backends.database:DatabaseBackend' # Any ANSIBLE_* settings will be passed to the subprocess environment by the # celery task.