diff --git a/awx/api/serializers.py b/awx/api/serializers.py index e7d09f8dde..2376956da5 100644 --- a/awx/api/serializers.py +++ b/awx/api/serializers.py @@ -702,7 +702,8 @@ class UnifiedJobSerializer(BaseSerializer): model = UnifiedJob fields = ('*', 'unified_job_template', 'launch_type', 'status', 'failed', 'started', 'finished', 'elapsed', 'job_args', - 'job_cwd', 'job_env', 'job_explanation', 'execution_node', + 'job_cwd', 'job_env', 'job_explanation', + 'execution_node', 'controller_node', 'result_traceback', 'event_processing_finished') extra_kwargs = { 'unified_job_template': { @@ -3434,7 +3435,7 @@ class WorkflowJobSerializer(LabelsListMixin, UnifiedJobSerializer): class Meta: model = WorkflowJob fields = ('*', 'workflow_job_template', 'extra_vars', 'allow_simultaneous', - '-execution_node', '-event_processing_finished',) + '-execution_node', '-event_processing_finished', '-controller_node',) def get_related(self, obj): res = super(WorkflowJobSerializer, self).get_related(obj) @@ -3463,7 +3464,7 @@ class WorkflowJobSerializer(LabelsListMixin, UnifiedJobSerializer): class WorkflowJobListSerializer(WorkflowJobSerializer, UnifiedJobListSerializer): class Meta: - fields = ('*', '-execution_node',) + fields = ('*', '-execution_node', '-controller_node',) class WorkflowJobCancelSerializer(WorkflowJobSerializer): diff --git a/awx/main/migrations/0040_v330_unifiedjob_controller_node.py b/awx/main/migrations/0040_v330_unifiedjob_controller_node.py new file mode 100644 index 0000000000..8b127dd06d --- /dev/null +++ b/awx/main/migrations/0040_v330_unifiedjob_controller_node.py @@ -0,0 +1,20 @@ +# -*- coding: utf-8 -*- +# Generated by Django 1.11.11 on 2018-05-25 18:58 +from __future__ import unicode_literals + +from django.db import migrations, models + + +class Migration(migrations.Migration): + + dependencies = [ + ('main', '0039_v330_custom_venv_help_text'), + ] + + operations = [ + migrations.AddField( + model_name='unifiedjob', + name='controller_node', + field=models.TextField(blank=True, default=b'', editable=False, help_text='The instance that managed the isolated execution environment.'), + ), + ] diff --git a/awx/main/models/unified_jobs.py b/awx/main/models/unified_jobs.py index 750472323b..62c5fc4e42 100644 --- a/awx/main/models/unified_jobs.py +++ b/awx/main/models/unified_jobs.py @@ -507,7 +507,8 @@ class StdoutMaxBytesExceeded(Exception): self.supported = supported -class UnifiedJob(PolymorphicModel, PasswordFieldsModel, CommonModelNameNotUnique, UnifiedJobTypeStringMixin, TaskManagerUnifiedJobMixin): +class UnifiedJob(PolymorphicModel, PasswordFieldsModel, CommonModelNameNotUnique, + UnifiedJobTypeStringMixin, TaskManagerUnifiedJobMixin): ''' Concrete base class for unified job run by the task engine. ''' @@ -571,6 +572,12 @@ class UnifiedJob(PolymorphicModel, PasswordFieldsModel, CommonModelNameNotUnique editable=False, help_text=_("The node the job executed on."), ) + controller_node = models.TextField( + blank=True, + default='', + editable=False, + help_text=_("The instance that managed the isolated execution environment."), + ) notifications = models.ManyToManyField( 'Notification', editable=False, @@ -1228,17 +1235,8 @@ class UnifiedJob(PolymorphicModel, PasswordFieldsModel, CommonModelNameNotUnique raise RuntimeError("Expected celery_task_id to be set on model.") kwargs['task_id'] = self.celery_task_id task_class = self._get_task_class() - args = [self.pk] - from awx.main.models.ha import InstanceGroup - ig = InstanceGroup.objects.get(name=queue) - if ig.controller_id: - if self.supports_isolation(): # case of jobs and ad hoc commands - isolated_instance = ig.instances.order_by('-capacity').first() - args.append(isolated_instance.hostname) - else: # proj & inv updates, system jobs run on controller - queue = ig.controller.name kwargs['queue'] = queue - task_class().apply_async(args, opts, **kwargs) + task_class().apply_async([self.pk], opts, **kwargs) def start(self, error_callback, success_callback, **kwargs): ''' @@ -1400,3 +1398,9 @@ class UnifiedJob(PolymorphicModel, PasswordFieldsModel, CommonModelNameNotUnique r['{}_schedule_id'.format(name)] = self.schedule.pk r['{}_schedule_name'.format(name)] = self.schedule.name return r + + def get_celery_queue_name(self): + return self.controller_node or self.execution_node or settings.CELERY_DEFAULT_QUEUE + + def get_isolated_execution_node_name(self): + return self.execution_node if self.controller_node else None diff --git a/awx/main/scheduler/task_manager.py b/awx/main/scheduler/task_manager.py index 1408601b79..9ad7a79652 100644 --- a/awx/main/scheduler/task_manager.py +++ b/awx/main/scheduler/task_manager.py @@ -7,6 +7,7 @@ import logging import uuid import json import six +import random from sets import Set # Django @@ -265,8 +266,16 @@ class TaskManager(): elif not task.supports_isolation() and rampart_group.controller_id: # non-Ansible jobs on isolated instances run on controller task.instance_group = rampart_group.controller - logger.info('Submitting isolated %s to queue %s via %s.', - task.log_format, task.instance_group_id, rampart_group.controller_id) + task.execution_node = random.choice(list(rampart_group.controller.instances.all().values_list('hostname', flat=True))) + logger.info(six.text_type('Submitting isolated {} to queue {}.').format( + task.log_format, task.instance_group.name, task.execution_node)) + elif task.supports_isolation() and rampart_group.controller_id: + # TODO: Select from only online nodes in the controller node + task.instance_group = rampart_group + task.execution_node = instance.hostname + task.controller_node = random.choice(list(rampart_group.controller.instances.all().values_list('hostname', flat=True))) + logger.info(six.text_type('Submitting isolated {} to queue {} controlled by {}.').format( + task.log_format, task.execution_node, task.controller_node)) else: task.instance_group = rampart_group if instance is not None: @@ -284,11 +293,10 @@ class TaskManager(): def post_commit(): task.websocket_emit_status(task.status) if task.status != 'failed': - if instance is not None: - actual_queue=instance.hostname - else: - actual_queue=settings.CELERY_DEFAULT_QUEUE - task.start_celery_task(opts, error_callback=error_handler, success_callback=success_handler, queue=actual_queue) + task.start_celery_task(opts, + error_callback=error_handler, + success_callback=success_handler, + queue=task.get_celery_queue_name()) connection.on_commit(post_commit) diff --git a/awx/main/tasks.py b/awx/main/tasks.py index d1a06d2af7..e9ff60b0da 100644 --- a/awx/main/tasks.py +++ b/awx/main/tasks.py @@ -868,15 +868,10 @@ class BaseTask(Task): ''' @with_path_cleanup - def run(self, pk, isolated_host=None, **kwargs): + def run(self, pk, **kwargs): ''' Run the job/task and capture its output. ''' - ''' - execution_node = settings.CLUSTER_HOST_ID - if isolated_host is not None: - execution_node = isolated_host - ''' instance = self.update_model(pk, status='running', start_args='') # blank field to remove encrypted passwords @@ -886,6 +881,8 @@ class BaseTask(Task): extra_update_fields = {} event_ct = 0 stdout_handle = None + isolated_host = instance.get_isolated_execution_node_name() + try: kwargs['isolated'] = isolated_host is not None self.pre_run_hook(instance, **kwargs)