diff --git a/awx/main/models/unified_jobs.py b/awx/main/models/unified_jobs.py index a81bcb6aca..bcdde810e9 100644 --- a/awx/main/models/unified_jobs.py +++ b/awx/main/models/unified_jobs.py @@ -798,34 +798,43 @@ class UnifiedJob(PolymorphicModel, PasswordFieldsModel, CommonModelNameNotUnique status=self.status, traceback=self.result_traceback) - def start(self, error_callback, success_callback, **kwargs): - ''' - Start the task running via Celery. - ''' - task_class = self._get_task_class() + def pre_start(self, **kwargs): if not self.can_start: self.job_explanation = u'%s is not in a startable status: %s, expecting one of %s' % (self._meta.verbose_name, self.status, str(('new', 'waiting'))) self.save(update_fields=['job_explanation']) - return False + return (False, None) + needed = self.get_passwords_needed_to_start() try: start_args = json.loads(decrypt_field(self, 'start_args')) except Exception: start_args = None + if start_args in (None, ''): start_args = kwargs + opts = dict([(field, start_args.get(field, '')) for field in needed]) + if not all(opts.values()): missing_fields = ', '.join([k for k,v in opts.items() if not v]) self.job_explanation = u'Missing needed fields: %s.' % missing_fields self.save(update_fields=['job_explanation']) - return False - #extra_data = dict([(field, kwargs[field]) for field in kwargs - # if field not in needed]) + return (False, None) + if 'extra_vars' in kwargs: self.handle_extra_data(kwargs['extra_vars']) - task_class().apply_async((self.pk,), opts, link_error=error_callback, link=success_callback) - return True + + return (True, opts) + + def start(self, error_callback, success_callback, **kwargs): + ''' + Start the task running via Celery. + ''' + task_class = self._get_task_class() + (res, opts) = self.pre_start(**kwargs) + if res: + task_class().apply_async((self.pk,), opts, link_error=error_callback, link=success_callback) + return res def signal_start(self, **kwargs): """Notify the task runner system to begin work on this task.""" @@ -852,6 +861,7 @@ class UnifiedJob(PolymorphicModel, PasswordFieldsModel, CommonModelNameNotUnique self.update_fields(start_args=json.dumps(kwargs), status='pending') self.socketio_emit_status("pending") + print("Running job launch for job %s" % self.name) from awx.main.scheduler.tasks import run_job_launch run_job_launch.delay(self.id) diff --git a/awx/main/models/workflow.py b/awx/main/models/workflow.py index 3c95fb17e8..68066ee58a 100644 --- a/awx/main/models/workflow.py +++ b/awx/main/models/workflow.py @@ -240,3 +240,11 @@ class WorkflowJob(UnifiedJob, WorkflowJobOptions, JobNotificationMixin, Workflow def get_notification_friendly_name(self): return "Workflow Job" + def start(self, *args, **kwargs): + (res, opts) = self.pre_start(**kwargs) + if res: + self.status = 'running' + self.save() + self.socketio_emit_status("running") + return res + diff --git a/awx/main/scheduler/__init__.py b/awx/main/scheduler/__init__.py index 1c3a1bc515..f10cb2dcd6 100644 --- a/awx/main/scheduler/__init__.py +++ b/awx/main/scheduler/__init__.py @@ -4,13 +4,14 @@ # Python import datetime import logging +import struct, fcntl, os # Django from django.conf import settings +from django.db import transaction # AWX from awx.main.models import * # noqa -from awx.main.tasks import handle_work_error, handle_work_success from awx.main.utils import get_system_task_capacity from awx.main.scheduler.dag_simple import SimpleDAG from awx.main.scheduler.dag_workflow import WorkflowDAG @@ -47,8 +48,8 @@ def get_running_workflow_jobs(): WorkflowJob.objects.filter(status='running')] return graph_workflow_jobs -def do_spawn_workflow_jobs(): - workflow_jobs = get_running_workflow_jobs() +def spawn_workflow_graph_jobs(workflow_jobs): + # TODO: Consider using transaction.atomic for workflow_job in workflow_jobs: dag = WorkflowDAG(workflow_job) spawn_nodes = dag.bfs_nodes_to_run() @@ -69,6 +70,16 @@ def do_spawn_workflow_jobs(): # TODO: should we emit a status on the socket here similar to tasks.py tower_periodic_scheduler() ? #emit_websocket_notification('/socket.io/jobs', '', dict(id=)) +# See comment in tasks.py::RunWorkflowJob::run() +def process_finished_workflow_jobs(workflow_jobs): + for workflow_job in workflow_jobs: + dag = WorkflowDAG(workflow_job) + if dag.is_workflow_done(): + with transaction.atomic(): + # TODO: detect if wfj failed + workflow_job.status = 'completed' + workflow_job.save() + workflow_job.socketio_emit_status('completed') def rebuild_graph(): """Regenerate the task graph by refreshing known tasks from Tower, purging @@ -88,8 +99,6 @@ def rebuild_graph(): logger.warn("Ignoring celery task inspector") active_task_queues = None - do_spawn_workflow_jobs() - all_sorted_tasks = get_tasks() if not len(all_sorted_tasks): return None @@ -106,12 +115,13 @@ def rebuild_graph(): return None running_tasks = filter(lambda t: t.status == 'running', all_sorted_tasks) + running_celery_tasks = filter(lambda t: type(t) != WorkflowJob, running_tasks) waiting_tasks = filter(lambda t: t.status != 'running', all_sorted_tasks) new_tasks = filter(lambda t: t.status == 'pending', all_sorted_tasks) # Check running tasks and make sure they are active in celery logger.debug("Active celery tasks: " + str(active_tasks)) - for task in list(running_tasks): + for task in list(running_celery_tasks): if (task.celery_task_id not in active_tasks and not hasattr(settings, 'IGNORE_CELERY_INSPECTOR')): # NOTE: Pull status again and make sure it didn't finish in # the meantime? @@ -122,7 +132,7 @@ def rebuild_graph(): )) task.save() task.socketio_emit_status("failed") - running_tasks.pop(running_tasks.index(task)) + running_tasks.pop(task) logger.error("Task %s appears orphaned... marking as failed" % task) # Create and process dependencies for new tasks @@ -171,6 +181,8 @@ def process_graph(graph, task_capacity): """Given a task dependency graph, start and manage tasks given their priority and weight. """ + from awx.main.tasks import handle_work_error, handle_work_success + leaf_nodes = graph.get_leaf_nodes() running_nodes = filter(lambda x: x['node_object'].status == 'running', leaf_nodes) running_impact = sum([t['node_object'].task_impact for t in running_nodes]) @@ -190,33 +202,57 @@ def process_graph(graph, task_capacity): node_dependencies = graph.get_dependents(node_obj) # Allow other tasks to continue if a job fails, even if they are # other jobs. - if graph.get_node_type(node_obj) == 'job': + + node_type = graph.get_node_type(node_obj) + if node_type == 'job': + # clear dependencies because a job can block (not necessarily + # depend) on other jobs that share the same job template node_dependencies = [] + + # Make the workflow_job look like it's started by setting status to + # running, but don't make a celery Task for it. + # Introduce jobs from the workflow so they are candidates to run. + # Call process_graph() again to allow choosing for run, the + # created candidate jobs. + elif node_type == 'workflow_job': + node_obj.start() + spawn_workflow_graph_jobs([node_obj]) + return process_graph(graph, task_capacity) + dependent_nodes = [{'type': graph.get_node_type(node_obj), 'id': node_obj.id}] + \ [{'type': graph.get_node_type(n['node_object']), 'id': n['node_object'].id} for n in node_dependencies] error_handler = handle_work_error.s(subtasks=dependent_nodes) success_handler = handle_work_success.s(task_actual={'type': graph.get_node_type(node_obj), 'id': node_obj.id}) - start_status = node_obj.start(error_callback=error_handler, success_callback=success_handler) - if not start_status: - node_obj.status = 'failed' - if node_obj.job_explanation: - node_obj.job_explanation += ' ' - node_obj.job_explanation += 'Task failed pre-start check.' - node_obj.save() - continue + with transaction.atomic(): + start_status = node_obj.start(error_callback=error_handler, success_callback=success_handler) + if not start_status: + node_obj.status = 'failed' + if node_obj.job_explanation: + node_obj.job_explanation += ' ' + node_obj.job_explanation += 'Task failed pre-start check.' + node_obj.save() + continue remaining_volume -= impact running_impact += impact logger.info('Started Node: %s (capacity hit: %s) ' 'Remaining Capacity: %s' % (str(node_obj), str(impact), str(remaining_volume))) - - def schedule(): + lockfile = open("/tmp/tower_scheduler.lock", "w") + fcntl.lockf(lockfile, fcntl.LOCK_EX) + task_capacity = get_system_task_capacity() + + workflow_jobs = get_running_workflow_jobs() + process_finished_workflow_jobs(workflow_jobs) + spawn_workflow_graph_jobs(workflow_jobs) + graph = rebuild_graph() if graph: process_graph(graph, task_capacity) + fcntl.lockf(lockfile, fcntl.LOCK_UN) + diff --git a/awx/main/tasks.py b/awx/main/tasks.py index 31db196a9b..bc09c58f60 100644 --- a/awx/main/tasks.py +++ b/awx/main/tasks.py @@ -1665,21 +1665,30 @@ class RunSystemJob(BaseTask): def build_cwd(self, instance, **kwargs): return settings.BASE_DIR +''' class RunWorkflowJob(BaseTask): name = 'awx.main.tasks.run_workflow_job' model = WorkflowJob def run(self, pk, **kwargs): - print("I'm a running a workflow job") - ''' - Run the job/task and capture its output. - ''' - pass + #Run the job/task and capture its output. instance = self.update_model(pk, status='running', celery_task_id=self.request.id) instance.socketio_emit_status("running") - # FIXME: Detect workflow run completion + # FIXME: Currently, the workflow job busy waits until the graph run is + # complete. Instead, the workflow job should return or never even run, + # because all of the "launch logic" can be done schedule(). + + # However, other aspects of our system depend on a 1-1 relationship + # between a Job and a Celery Task. + # + # * If we let the workflow job task (RunWorkflowJob.run()) complete + # then how do we trigger the handle_work_error and + # handle_work_success subtasks? + # + # * How do we handle the recovery process? (i.e. there is an entry in + # the database but not in celery). while True: dag = WorkflowDAG(instance) if dag.is_workflow_done(): @@ -1689,4 +1698,4 @@ class RunWorkflowJob(BaseTask): time.sleep(1) instance.socketio_emit_status(instance.status) # TODO: Handle cancel - +''' diff --git a/awx/settings/defaults.py b/awx/settings/defaults.py index 20a80ecca4..445ce8924f 100644 --- a/awx/settings/defaults.py +++ b/awx/settings/defaults.py @@ -360,7 +360,7 @@ CELERY_ROUTES = ({'awx.main.tasks.run_job': {'queue': 'jobs', 'routing_key': 'scheduler.job.launch'}, 'awx.main.scheduler.tasks.run_job_complete': {'queue': 'scheduler', 'routing_key': 'scheduler.job.complete'},}) - + CELERYBEAT_SCHEDULE = { 'tower_scheduler': { 'task': 'awx.main.tasks.tower_periodic_scheduler',