1
0
mirror of https://github.com/ansible/awx.git synced 2024-11-02 09:51:09 +03:00

Initial work towards the celery refactor... adjusting logic to allow building a worker chain... temporarily relax requirements on status checks

This commit is contained in:
Matthew Jones 2014-01-27 11:37:18 -05:00
parent 188d7b41bb
commit 4a598d7c0a
6 changed files with 87 additions and 98 deletions

View File

@ -366,7 +366,7 @@ class CommonTask(PrimordialModel):
def _get_passwords_needed_to_start(self): def _get_passwords_needed_to_start(self):
return [] return []
def start(self, **kwargs): def start_signature(self, **kwargs):
task_class = self._get_task_class() task_class = self._get_task_class()
if not self.can_start: if not self.can_start:
return False return False
@ -377,7 +377,13 @@ class CommonTask(PrimordialModel):
self.status = 'pending' self.status = 'pending'
self.save(update_fields=['status']) self.save(update_fields=['status'])
transaction.commit() transaction.commit()
task_result = task_class().delay(self.pk, **opts) task_actual = task_class().si(self.pk, **opts)
return task_actual
def start(self, **kwargs):
task_actual = self.start_signature(**kwargs)
# TODO: Callback for status
task_result = task_actual.delay()
# Reload instance from database so we don't clobber results from task # Reload instance from database so we don't clobber results from task
# (mainly from tests when using Django 1.4.x). # (mainly from tests when using Django 1.4.x).
instance = self.__class__.objects.get(pk=self.pk) instance = self.__class__.objects.get(pk=self.pk)

View File

@ -685,6 +685,12 @@ class InventorySource(PrimordialModel):
# FIXME: Prevent update when another one is active! # FIXME: Prevent update when another one is active!
return bool(self.source) return bool(self.source)
def update_signature(self, **kwargs):
if self.can_update:
inventory_update = self.inventory_updates.create()
inventory_update_sig = inventory_update.start_signature()
return (inventory_update, inventory_update_sig)
def update(self, **kwargs): def update(self, **kwargs):
if self.can_update: if self.can_update:
inventory_update = self.inventory_updates.create() inventory_update = self.inventory_updates.create()

View File

@ -18,6 +18,7 @@ import yaml
# Django # Django
from django.conf import settings from django.conf import settings
from django.db import models from django.db import models
from django.db import transaction
from django.utils.translation import ugettext_lazy as _ from django.utils.translation import ugettext_lazy as _
from django.core.exceptions import ValidationError, NON_FIELD_ERRORS from django.core.exceptions import ValidationError, NON_FIELD_ERRORS
from django.core.urlresolvers import reverse from django.core.urlresolvers import reverse
@ -30,6 +31,11 @@ from jsonfield import JSONField
# AWX # AWX
from awx.main.models.base import * from awx.main.models.base import *
# Celery
from celery import chain
logger = logging.getLogger('awx.main.models.jobs')
__all__ = ['JobTemplate', 'Job', 'JobHostSummary', 'JobEvent'] __all__ = ['JobTemplate', 'Job', 'JobHostSummary', 'JobEvent']
@ -328,6 +334,51 @@ class Job(CommonTask):
def processed_hosts(self): def processed_hosts(self):
return self._get_hosts(job_host_summaries__processed__gt=0) return self._get_hosts(job_host_summaries__processed__gt=0)
def start(self, **kwargs):
task_class = self._get_task_class()
if not self.can_start:
return False
needed = self._get_passwords_needed_to_start()
opts = dict([(field, kwargs.get(field, '')) for field in needed])
if not all(opts.values()):
return False
self.status = 'waiting'
self.save(update_fields=['status'])
transaction.commit()
runnable_tasks = []
inventory_updates_actual = []
project_update_actual = None
project = self.project
inventory = self.inventory
is_qs = inventory.inventory_sources.filter(active=True, update_on_launch=True)
if project.scm_update_on_launch:
# TODO: We assume these return a tuple but not on error
project_update, project_update_sig = project.update_signature()
if not project_update:
# TODO: Set error here
pass
else:
project_update_actual = project_update
# TODO: append a callback to gather the status?
runnable_tasks.append(project_update_sig)
# TODO: need to add celery task id to proj update instance
if is_qs.count():
for inventory_source in is_qs:
# TODO: We assume these return a tuple but not on error
inventory_update, inventory_update_sig = inventory_source.update_signature()
if not inventory_update:
# TODO: Set error here
pass
else:
inventory_updates_actual.append(inventory_update)
runnable_tasks.append(inventory_update_sig)
job_actual = task_class().si(self.pk, **opts)
runnable_tasks.append(job_actual)
print runnable_tasks
res = chain(runnable_tasks)()
return True
class JobHostSummary(BaseModel): class JobHostSummary(BaseModel):
''' '''

View File

@ -282,6 +282,12 @@ class Project(CommonModel):
# FIXME: Prevent update when another one is active! # FIXME: Prevent update when another one is active!
return bool(self.scm_type)# and not self.current_update) return bool(self.scm_type)# and not self.current_update)
def update_signature(self, **kwargs):
if self.can_update:
project_update = self.project_updates.create()
project_update_sig = project_update.start_signature()
return (project_update, project_updaate_sig)
def update(self, **kwargs): def update(self, **kwargs):
if self.can_update: if self.can_update:
project_update = self.project_updates.create() project_update = self.project_updates.create()

View File

@ -108,6 +108,7 @@ class BaseTask(Task):
'yes': 'yes', 'yes': 'yes',
'no': 'no', 'no': 'no',
'': '', '': '',
} }
def build_env(self, instance, **kwargs): def build_env(self, instance, **kwargs):
@ -208,10 +209,12 @@ class BaseTask(Task):
# we have a way to know the task is still running, otherwise the # we have a way to know the task is still running, otherwise the
# post_run_hook below would cancel long-running tasks that are # post_run_hook below would cancel long-running tasks that are
# really still active). # really still active).
instance = self.update_model(instance.pk, status='running') #TODO: Find replacement for cancel flag
if instance.cancel_flag: #TODO: Something about checking celery status
child.close(True) # instance = self.update_model(instance.pk, status='running')
canceled = True # if instance.cancel_flag:
# child.close(True)
# canceled = True
# FIXME: Find a way to determine if task is hung waiting at a prompt. # FIXME: Find a way to determine if task is hung waiting at a prompt.
if idle_timeout and (time.time() - last_stdout_update) > idle_timeout: if idle_timeout and (time.time() - last_stdout_update) > idle_timeout:
child.close(True) child.close(True)
@ -444,95 +447,11 @@ class RunJob(BaseTask):
''' '''
Hook for checking job before running. Hook for checking job before running.
''' '''
project_update = None
inventory_updates = None
while True:
pk = job.pk
if job.status in ('pending', 'waiting'): if job.status in ('pending', 'waiting'):
project = job.project job = self.update_model(job.pk, status='pending')
pu_qs = project.project_updates.filter(status__in=('pending', 'running'))
inventory = job.inventory
base_iu_qs = InventoryUpdate.objects.filter(inventory_source__inventory=inventory)
iu_qs = base_iu_qs.filter(status__in=('pending', 'running'))
is_qs = inventory.inventory_sources.filter(active=True, update_on_launch=True)
# Refresh the current project_update instance (if set).
if project_update:
try:
project_update = project.project_updates.filter(pk=project_update.pk)[0]
except IndexError:
msg = 'Unable to check project update.'
job = self.update_model(pk, status='error',
result_traceback=msg)
return False
# Refresh the current inventory_update instance(s) (if set).
if inventory_updates:
inventory_update_pks = [x.pk for x in inventory_updates]
inventory_updates = list(base_iu_qs.filter(pk__in=inventory_update_pks))
# If the job needs to update the project first (and there is no
# specific project update defined).
if not project_update and project.scm_update_on_launch:
job = self.update_model(pk, status='waiting')
try:
project_update = pu_qs[0]
except IndexError:
project_update = project.update()
if not project_update:
msg = 'Unable to update project before launch.'
job = self.update_model(pk, status='error',
result_traceback=msg)
return False
#print 'job %d waiting on project update %d' % (pk, project_update.pk)
time.sleep(2.0)
# If the job needs to update any inventory first (and there are
# no current inventory updates pending).
elif inventory_updates is None and is_qs.count():
job = self.update_model(pk, status='waiting')
inventory_updates = []
msgs = []
for inventory_source in is_qs:
try:
inventory_update = iu_qs.filter(inventory_source=inventory_source)[0]
except IndexError:
inventory_update = inventory_source.update()
if not inventory_update:
msgs.append('Unable to update inventory source %d before launch' % inventory_source.pk)
continue
inventory_updates.append(inventory_update)
if msgs:
msg = '\n'.join(msgs)
job = self.update_model(pk, status='error',
result_traceback=msg)
return False
time.sleep(2.0)
# If project update has failed, abort the job.
elif project_update and project_update.failed:
msg = 'Project update %d failed with status = %s.' % (project_update.pk, project_update.status)
job = self.update_model(pk, status='error',
result_traceback=msg)
return False
# If any inventory update has failed, abort the job.
elif inventory_updates and any([x.failed for x in inventory_updates]):
msgs = []
for inventory_update in inventory_updates:
if inventory_update.failed:
msgs.append('Inventory update %d failed with status = %s.' % (inventory_update.pk, inventory_update.status))
if msgs:
msg = '\n'.join(msgs)
job = self.update_model(pk, status='error',
result_traceback=msg)
return False
# Check if blocked by any other active project or inventory updates.
elif pu_qs.count() or iu_qs.count():
#print 'job %d waiting on' % pk, pu_qs
job = self.update_model(pk, status='waiting')
time.sleep(4.0)
# Otherwise continue running the job.
else:
job = self.update_model(pk, status='pending')
return True return True
elif job.cancel_flag: elif job.cancel_flag:
job = self.update_model(pk, status='canceled') job = self.update_model(job.pk, status='canceled')
return False return False
else: else:
return False return False

View File

@ -282,6 +282,7 @@ CELERYD_TASK_TIME_LIMIT = None
CELERYD_TASK_SOFT_TIME_LIMIT = None CELERYD_TASK_SOFT_TIME_LIMIT = None
CELERYBEAT_SCHEDULER = 'djcelery.schedulers.DatabaseScheduler' CELERYBEAT_SCHEDULER = 'djcelery.schedulers.DatabaseScheduler'
CELERYBEAT_MAX_LOOP_INTERVAL = 60 CELERYBEAT_MAX_LOOP_INTERVAL = 60
CELERY_RESULT_BACKEND = 'djcelery.backends.database:DatabaseBackend'
# Any ANSIBLE_* settings will be passed to the subprocess environment by the # Any ANSIBLE_* settings will be passed to the subprocess environment by the
# celery task. # celery task.