1
0
mirror of https://github.com/ansible/awx.git synced 2024-11-01 08:21:15 +03:00

conform isolated system to new early node choice

* Randomly chose an instance in the controller instance group for which
to control the isolated node run. Note the chosen instance via a job
controller_node field
This commit is contained in:
chris meyers 2018-05-25 14:53:24 -04:00
parent e720fe5dd0
commit 8d352a4edf
5 changed files with 57 additions and 27 deletions

View File

@ -702,7 +702,8 @@ class UnifiedJobSerializer(BaseSerializer):
model = UnifiedJob
fields = ('*', 'unified_job_template', 'launch_type', 'status',
'failed', 'started', 'finished', 'elapsed', 'job_args',
'job_cwd', 'job_env', 'job_explanation', 'execution_node',
'job_cwd', 'job_env', 'job_explanation',
'execution_node', 'controller_node',
'result_traceback', 'event_processing_finished')
extra_kwargs = {
'unified_job_template': {
@ -3434,7 +3435,7 @@ class WorkflowJobSerializer(LabelsListMixin, UnifiedJobSerializer):
class Meta:
model = WorkflowJob
fields = ('*', 'workflow_job_template', 'extra_vars', 'allow_simultaneous',
'-execution_node', '-event_processing_finished',)
'-execution_node', '-event_processing_finished', '-controller_node',)
def get_related(self, obj):
res = super(WorkflowJobSerializer, self).get_related(obj)
@ -3463,7 +3464,7 @@ class WorkflowJobSerializer(LabelsListMixin, UnifiedJobSerializer):
class WorkflowJobListSerializer(WorkflowJobSerializer, UnifiedJobListSerializer):
class Meta:
fields = ('*', '-execution_node',)
fields = ('*', '-execution_node', '-controller_node',)
class WorkflowJobCancelSerializer(WorkflowJobSerializer):

View File

@ -0,0 +1,20 @@
# -*- coding: utf-8 -*-
# Generated by Django 1.11.11 on 2018-05-25 18:58
from __future__ import unicode_literals
from django.db import migrations, models
class Migration(migrations.Migration):
dependencies = [
('main', '0039_v330_custom_venv_help_text'),
]
operations = [
migrations.AddField(
model_name='unifiedjob',
name='controller_node',
field=models.TextField(blank=True, default=b'', editable=False, help_text='The instance that managed the isolated execution environment.'),
),
]

View File

@ -507,7 +507,8 @@ class StdoutMaxBytesExceeded(Exception):
self.supported = supported
class UnifiedJob(PolymorphicModel, PasswordFieldsModel, CommonModelNameNotUnique, UnifiedJobTypeStringMixin, TaskManagerUnifiedJobMixin):
class UnifiedJob(PolymorphicModel, PasswordFieldsModel, CommonModelNameNotUnique,
UnifiedJobTypeStringMixin, TaskManagerUnifiedJobMixin):
'''
Concrete base class for unified job run by the task engine.
'''
@ -571,6 +572,12 @@ class UnifiedJob(PolymorphicModel, PasswordFieldsModel, CommonModelNameNotUnique
editable=False,
help_text=_("The node the job executed on."),
)
controller_node = models.TextField(
blank=True,
default='',
editable=False,
help_text=_("The instance that managed the isolated execution environment."),
)
notifications = models.ManyToManyField(
'Notification',
editable=False,
@ -1228,17 +1235,8 @@ class UnifiedJob(PolymorphicModel, PasswordFieldsModel, CommonModelNameNotUnique
raise RuntimeError("Expected celery_task_id to be set on model.")
kwargs['task_id'] = self.celery_task_id
task_class = self._get_task_class()
args = [self.pk]
from awx.main.models.ha import InstanceGroup
ig = InstanceGroup.objects.get(name=queue)
if ig.controller_id:
if self.supports_isolation(): # case of jobs and ad hoc commands
isolated_instance = ig.instances.order_by('-capacity').first()
args.append(isolated_instance.hostname)
else: # proj & inv updates, system jobs run on controller
queue = ig.controller.name
kwargs['queue'] = queue
task_class().apply_async(args, opts, **kwargs)
task_class().apply_async([self.pk], opts, **kwargs)
def start(self, error_callback, success_callback, **kwargs):
'''
@ -1400,3 +1398,9 @@ class UnifiedJob(PolymorphicModel, PasswordFieldsModel, CommonModelNameNotUnique
r['{}_schedule_id'.format(name)] = self.schedule.pk
r['{}_schedule_name'.format(name)] = self.schedule.name
return r
def get_celery_queue_name(self):
return self.controller_node or self.execution_node or settings.CELERY_DEFAULT_QUEUE
def get_isolated_execution_node_name(self):
return self.execution_node if self.controller_node else None

View File

@ -7,6 +7,7 @@ import logging
import uuid
import json
import six
import random
from sets import Set
# Django
@ -265,8 +266,16 @@ class TaskManager():
elif not task.supports_isolation() and rampart_group.controller_id:
# non-Ansible jobs on isolated instances run on controller
task.instance_group = rampart_group.controller
logger.info('Submitting isolated %s to queue %s via %s.',
task.log_format, task.instance_group_id, rampart_group.controller_id)
task.execution_node = random.choice(list(rampart_group.controller.instances.all().values_list('hostname', flat=True)))
logger.info(six.text_type('Submitting isolated {} to queue {}.').format(
task.log_format, task.instance_group.name, task.execution_node))
elif task.supports_isolation() and rampart_group.controller_id:
# TODO: Select from only online nodes in the controller node
task.instance_group = rampart_group
task.execution_node = instance.hostname
task.controller_node = random.choice(list(rampart_group.controller.instances.all().values_list('hostname', flat=True)))
logger.info(six.text_type('Submitting isolated {} to queue {} controlled by {}.').format(
task.log_format, task.execution_node, task.controller_node))
else:
task.instance_group = rampart_group
if instance is not None:
@ -284,11 +293,10 @@ class TaskManager():
def post_commit():
task.websocket_emit_status(task.status)
if task.status != 'failed':
if instance is not None:
actual_queue=instance.hostname
else:
actual_queue=settings.CELERY_DEFAULT_QUEUE
task.start_celery_task(opts, error_callback=error_handler, success_callback=success_handler, queue=actual_queue)
task.start_celery_task(opts,
error_callback=error_handler,
success_callback=success_handler,
queue=task.get_celery_queue_name())
connection.on_commit(post_commit)

View File

@ -868,15 +868,10 @@ class BaseTask(Task):
'''
@with_path_cleanup
def run(self, pk, isolated_host=None, **kwargs):
def run(self, pk, **kwargs):
'''
Run the job/task and capture its output.
'''
'''
execution_node = settings.CLUSTER_HOST_ID
if isolated_host is not None:
execution_node = isolated_host
'''
instance = self.update_model(pk, status='running',
start_args='') # blank field to remove encrypted passwords
@ -886,6 +881,8 @@ class BaseTask(Task):
extra_update_fields = {}
event_ct = 0
stdout_handle = None
isolated_host = instance.get_isolated_execution_node_name()
try:
kwargs['isolated'] = isolated_host is not None
self.pre_run_hook(instance, **kwargs)