1
0
mirror of https://github.com/ansible/awx.git synced 2024-11-01 08:21:15 +03:00

all dependent jobs must finish before starting job

related to https://github.com/ansible/ansible-tower/issues/6570 https://github.com/ansible/ansible-tower/issues/6489

* Ensure that all jobs dependent on a job have finished (i.e. error,
success, failed) before starting the dependent job.
* Fixes a bug where a smaller set of dependent jobs to fail upon a
update_on_launch job failing is chosen.
* This fixes the bug of jobs starting before dependent Project Updates
created via update on launch. This also fixes similar bugs associated
with inventory updates.
This commit is contained in:
Chris Meyers 2017-07-24 12:31:35 -04:00
parent f175fbba23
commit 525490a9a0
10 changed files with 133 additions and 88 deletions

View File

@ -29,8 +29,7 @@ from awx.main.fields import (
from awx.main.managers import HostManager
from awx.main.models.base import * # noqa
from awx.main.models.unified_jobs import * # noqa
from awx.main.models.jobs import Job
from awx.main.models.mixins import ResourceMixin
from awx.main.models.mixins import ResourceMixin, TaskManagerInventoryUpdateMixin
from awx.main.models.notifications import (
NotificationTemplate,
JobNotificationMixin,
@ -1391,7 +1390,7 @@ class InventorySource(UnifiedJobTemplate, InventorySourceOptions):
return source
class InventoryUpdate(UnifiedJob, InventorySourceOptions, JobNotificationMixin):
class InventoryUpdate(UnifiedJob, InventorySourceOptions, JobNotificationMixin, TaskManagerInventoryUpdateMixin):
'''
Internal job for tracking inventory updates from external sources.
'''
@ -1508,20 +1507,9 @@ class InventoryUpdate(UnifiedJob, InventorySourceOptions, JobNotificationMixin):
return self.global_instance_groups
return selected_groups
def _build_job_explanation(self):
if not self.job_explanation:
return 'Previous Task Canceled: {"job_type": "%s", "job_name": "%s", "job_id": "%s"}' % \
(self.model_to_str(), self.name, self.id)
return None
def get_dependent_jobs(self):
return Job.objects.filter(dependent_jobs__in=[self.id])
def cancel(self, job_explanation=None):
res = super(InventoryUpdate, self).cancel(job_explanation=job_explanation)
def cancel(self, job_explanation=None, is_chain=False):
res = super(InventoryUpdate, self).cancel(job_explanation=job_explanation, is_chain=is_chain)
if res:
map(lambda x: x.cancel(job_explanation=self._build_job_explanation()), self.get_dependent_jobs())
if self.launch_type != 'scm' and self.source_project_update:
self.source_project_update.cancel(job_explanation=job_explanation)
return res

View File

@ -38,7 +38,7 @@ from awx.main.utils import (
parse_yaml_or_json,
)
from awx.main.fields import ImplicitRoleField
from awx.main.models.mixins import ResourceMixin, SurveyJobTemplateMixin, SurveyJobMixin
from awx.main.models.mixins import ResourceMixin, SurveyJobTemplateMixin, SurveyJobMixin, TaskManagerJobMixin
from awx.main.models.base import PERM_INVENTORY_SCAN
from awx.main.fields import JSONField
@ -449,7 +449,7 @@ class JobTemplate(UnifiedJobTemplate, JobOptions, SurveyJobTemplateMixin, Resour
return dict(error=list(error_notification_templates), success=list(success_notification_templates), any=list(any_notification_templates))
class Job(UnifiedJob, JobOptions, SurveyJobMixin, JobNotificationMixin):
class Job(UnifiedJob, JobOptions, SurveyJobMixin, JobNotificationMixin, TaskManagerJobMixin):
'''
A job applies a project (with playbook) to an inventory source with a given
credential. It represents a single invocation of ansible-playbook with the
@ -709,16 +709,6 @@ class Job(UnifiedJob, JobOptions, SurveyJobMixin, JobNotificationMixin):
def get_notification_friendly_name(self):
return "Job"
'''
Canceling a job also cancels the implicit project update with launch_type
run.
'''
def cancel(self, job_explanation=None):
res = super(Job, self).cancel(job_explanation=job_explanation)
if self.project_update:
self.project_update.cancel(job_explanation=job_explanation)
return res
@property
def memcached_fact_key(self):
return '{}'.format(self.inventory.id)

View File

@ -16,7 +16,9 @@ from awx.main.utils import parse_yaml_or_json
from awx.main.fields import JSONField
__all__ = ['ResourceMixin', 'SurveyJobTemplateMixin', 'SurveyJobMixin']
__all__ = ['ResourceMixin', 'SurveyJobTemplateMixin', 'SurveyJobMixin',
'TaskManagerUnifiedJobMixin', 'TaskManagerJobMixin', 'TaskManagerProjectUpdateMixin',
'TaskManagerInventoryUpdateMixin',]
class ResourceMixin(models.Model):
@ -249,3 +251,45 @@ class SurveyJobMixin(models.Model):
return json.dumps(extra_vars)
else:
return self.extra_vars
class TaskManagerUnifiedJobMixin(models.Model):
class Meta:
abstract = True
def get_jobs_fail_chain(self):
return []
def dependent_jobs_finished(self):
return True
class TaskManagerJobMixin(TaskManagerUnifiedJobMixin):
class Meta:
abstract = True
def dependent_jobs_finished(self):
for j in self.dependent_jobs.all():
if j.status in ['pending', 'waiting', 'running']:
return False
return True
class TaskManagerUpdateOnLaunchMixin(TaskManagerUnifiedJobMixin):
class Meta:
abstract = True
def get_jobs_fail_chain(self):
return list(self.dependent_jobs.all())
class TaskManagerProjectUpdateMixin(TaskManagerUpdateOnLaunchMixin):
class Meta:
abstract = True
class TaskManagerInventoryUpdateMixin(TaskManagerUpdateOnLaunchMixin):
class Meta:
abstract = True

View File

@ -23,7 +23,7 @@ from awx.main.models.notifications import (
JobNotificationMixin,
)
from awx.main.models.unified_jobs import * # noqa
from awx.main.models.mixins import ResourceMixin
from awx.main.models.mixins import ResourceMixin, TaskManagerProjectUpdateMixin
from awx.main.utils import update_scm_url
from awx.main.utils.ansible import skip_directory, could_be_inventory, could_be_playbook
from awx.main.fields import ImplicitRoleField
@ -430,7 +430,7 @@ class Project(UnifiedJobTemplate, ProjectOptions, ResourceMixin):
return reverse('api:project_detail', kwargs={'pk': self.pk}, request=request)
class ProjectUpdate(UnifiedJob, ProjectOptions, JobNotificationMixin):
class ProjectUpdate(UnifiedJob, ProjectOptions, JobNotificationMixin, TaskManagerProjectUpdateMixin):
'''
Internal job for tracking project updates from SCM.
'''
@ -512,8 +512,8 @@ class ProjectUpdate(UnifiedJob, ProjectOptions, JobNotificationMixin):
update_fields.append('scm_delete_on_next_update')
parent_instance.save(update_fields=update_fields)
def cancel(self, job_explanation=None):
res = super(ProjectUpdate, self).cancel(job_explanation=job_explanation)
def cancel(self, job_explanation=None, is_chain=False):
res = super(ProjectUpdate, self).cancel(job_explanation=job_explanation, is_chain=is_chain)
if res and self.launch_type != 'sync':
for inv_src in self.scm_inventory_updates.filter(status='running'):
inv_src.cancel(job_explanation='Source project update `{}` was canceled.'.format(self.name))

View File

@ -30,7 +30,7 @@ from djcelery.models import TaskMeta
# AWX
from awx.main.models.base import * # noqa
from awx.main.models.schedules import Schedule
from awx.main.models.mixins import ResourceMixin
from awx.main.models.mixins import ResourceMixin, TaskManagerUnifiedJobMixin
from awx.main.utils import (
decrypt_field, _inventory_updates,
copy_model_by_class, copy_m2m_relationships
@ -414,7 +414,7 @@ class UnifiedJobTypeStringMixin(object):
return UnifiedJobTypeStringMixin._camel_to_underscore(self.__class__.__name__)
class UnifiedJob(PolymorphicModel, PasswordFieldsModel, CommonModelNameNotUnique, UnifiedJobTypeStringMixin):
class UnifiedJob(PolymorphicModel, PasswordFieldsModel, CommonModelNameNotUnique, UnifiedJobTypeStringMixin, TaskManagerUnifiedJobMixin):
'''
Concrete base class for unified job run by the task engine.
'''
@ -1058,8 +1058,17 @@ class UnifiedJob(PolymorphicModel, PasswordFieldsModel, CommonModelNameNotUnique
if settings.DEBUG:
raise
def cancel(self, job_explanation=None):
def _build_job_explanation(self):
if not self.job_explanation:
return 'Previous Task Canceled: {"job_type": "%s", "job_name": "%s", "job_id": "%s"}' % \
(self.model_to_str(), self.name, self.id)
return None
def cancel(self, job_explanation=None, is_chain=False):
if self.can_cancel:
if not is_chain:
map(lambda x: x.cancel(job_explanation=self._build_job_explanation(), is_chain=True), self.get_jobs_fail_chain())
if not self.cancel_flag:
self.cancel_flag = True
cancel_fields = ['cancel_flag']

View File

@ -47,6 +47,10 @@ class TaskManager():
for g in self.graph:
if self.graph[g]['graph'].is_job_blocked(task):
return True
if not task.dependent_jobs_finished():
return True
return False
def get_tasks(self, status_list=('pending', 'waiting', 'running')):
@ -262,11 +266,12 @@ class TaskManager():
return inventory_task
def capture_chain_failure_dependencies(self, task, dependencies):
for dep in dependencies:
with disable_activity_stream():
logger.info('Adding unified job {} to list of dependencies of {}.'.format(task.id, dep.id))
dep.dependent_jobs.add(task.id)
dep.save()
with disable_activity_stream():
task.dependent_jobs.add(*dependencies)
for dep in dependencies:
# Add task + all deps except self
dep.dependent_jobs.add(*([task] + filter(lambda d: d != dep, dependencies)))
def should_update_inventory_source(self, job, inventory_source):
now = tz_now()
@ -342,7 +347,9 @@ class TaskManager():
if self.should_update_inventory_source(task, inventory_source):
inventory_task = self.create_inventory_update(task, inventory_source)
dependencies.append(inventory_task)
self.capture_chain_failure_dependencies(task, dependencies)
if len(dependencies) > 0:
self.capture_chain_failure_dependencies(task, dependencies)
return dependencies
def process_dependencies(self, dependent_task, dependency_tasks):
@ -359,7 +366,9 @@ class TaskManager():
if not self.would_exceed_capacity(task, rampart_group.name):
logger.debug("Starting dependent task {} in group {}".format(task, rampart_group.name))
self.graph[rampart_group.name]['graph'].add_job(task)
self.start_task(task, rampart_group, dependency_tasks)
tasks_to_fail = filter(lambda t: t != task, dependency_tasks)
tasks_to_fail += [dependent_task]
self.start_task(task, rampart_group, tasks_to_fail)
found_acceptable_queue = True
if not found_acceptable_queue:
logger.debug("Dependent task {} couldn't be scheduled on graph, waiting for next cycle".format(task))
@ -379,7 +388,7 @@ class TaskManager():
if not self.would_exceed_capacity(task, rampart_group.name):
logger.debug("Starting task {} in group {}".format(task, rampart_group.name))
self.graph[rampart_group.name]['graph'].add_job(task)
self.start_task(task, rampart_group)
self.start_task(task, rampart_group, task.get_jobs_fail_chain())
found_acceptable_queue = True
break
if not found_acceptable_queue:

View File

@ -314,7 +314,7 @@ def handle_work_error(self, task_id, subtasks=None):
first_instance = instance
first_instance_type = each_task['type']
if instance.celery_task_id != task_id:
if instance.celery_task_id != task_id and not instance.cancel_flag:
instance.status = 'failed'
instance.failed = True
if not instance.job_explanation:
@ -1398,11 +1398,12 @@ class RunProjectUpdate(BaseTask):
def get_stdout_handle(self, instance):
stdout_handle = super(RunProjectUpdate, self).get_stdout_handle(instance)
pk = instance.pk
def raw_callback(data):
instance_actual = ProjectUpdate.objects.get(pk=instance.pk)
instance_actual.result_stdout_text += data
instance_actual.save()
instance_actual = self.update_model(pk)
result_stdout_text = instance_actual.result_stdout_text + data
self.update_model(pk, result_stdout_text=result_stdout_text)
return OutputEventFilter(stdout_handle, raw_callback=raw_callback)
def _update_dependent_inventories(self, project_update, dependent_inventory_sources):
@ -1872,11 +1873,12 @@ class RunInventoryUpdate(BaseTask):
def get_stdout_handle(self, instance):
stdout_handle = super(RunInventoryUpdate, self).get_stdout_handle(instance)
pk = instance.pk
def raw_callback(data):
instance_actual = InventoryUpdate.objects.get(pk=instance.pk)
instance_actual.result_stdout_text += data
instance_actual.save()
instance_actual = self.update_model(pk)
result_stdout_text = instance_actual.result_stdout_text + data
self.update_model(pk, result_stdout_text=result_stdout_text)
return OutputEventFilter(stdout_handle, raw_callback=raw_callback)
def build_cwd(self, inventory_update, **kwargs):
@ -2138,11 +2140,12 @@ class RunSystemJob(BaseTask):
def get_stdout_handle(self, instance):
stdout_handle = super(RunSystemJob, self).get_stdout_handle(instance)
pk = instance.pk
def raw_callback(data):
instance_actual = SystemJob.objects.get(pk=instance.pk)
instance_actual.result_stdout_text += data
instance_actual.save()
instance_actual = self.update_model(pk)
result_stdout_text = instance_actual.result_stdout_text + data
self.update_model(pk, result_stdout_text=result_stdout_text)
return OutputEventFilter(stdout_handle, raw_callback=raw_callback)
def build_env(self, instance, **kwargs):

View File

@ -29,7 +29,7 @@ def test_multi_group_basic_job_launch(instance_factory, default_instance_group,
mock_task_impact.return_value = 500
with mocker.patch("awx.main.scheduler.TaskManager.start_task"):
TaskManager().schedule()
TaskManager.start_task.assert_has_calls([mock.call(j1, ig1), mock.call(j2, ig2)])
TaskManager.start_task.assert_has_calls([mock.call(j1, ig1, []), mock.call(j2, ig2, [])])
@ -63,13 +63,26 @@ def test_multi_group_with_shared_dependency(instance_factory, default_instance_g
with mocker.patch("awx.main.scheduler.TaskManager.start_task"):
TaskManager().schedule()
pu = p.project_updates.first()
TaskManager.start_task.assert_called_once_with(pu, default_instance_group, [pu])
TaskManager.start_task.assert_called_once_with(pu, default_instance_group, [j1])
pu.finished = pu.created + timedelta(seconds=1)
pu.status = "successful"
pu.save()
with mock.patch("awx.main.scheduler.TaskManager.start_task"):
TaskManager().schedule()
TaskManager.start_task.assert_has_calls([mock.call(j1, ig1), mock.call(j2, ig2)])
TaskManager.start_task.assert_called_once_with(j1, ig1, [])
j1.finished = j1.created + timedelta(seconds=2)
j1.status = "successful"
j1.save()
with mock.patch("awx.main.scheduler.TaskManager.start_task"):
TaskManager().schedule()
pu = p.project_updates.last()
TaskManager.start_task.assert_called_once_with(pu, default_instance_group, [j2])
pu.finished = pu.created + timedelta(seconds=1)
pu.status = "successful"
pu.save()
with mock.patch("awx.main.scheduler.TaskManager.start_task"):
TaskManager().schedule()
TaskManager.start_task.assert_called_once_with(j2, ig2, [])
@pytest.mark.django_db
@ -114,8 +127,8 @@ def test_overcapacity_blocking_other_groups_unaffected(instance_factory, default
mock_task_impact.return_value = 500
with mock.patch.object(TaskManager, "start_task", wraps=tm.start_task) as mock_job:
tm.schedule()
mock_job.assert_has_calls([mock.call(j1, ig1), mock.call(j1_1, ig1),
mock.call(j2, ig2)])
mock_job.assert_has_calls([mock.call(j1, ig1, []), mock.call(j1_1, ig1, []),
mock.call(j2, ig2, [])])
assert mock_job.call_count == 3
@ -146,5 +159,5 @@ def test_failover_group_run(instance_factory, default_instance_group, mocker,
mock_task_impact.return_value = 500
with mock.patch.object(TaskManager, "start_task", wraps=tm.start_task) as mock_job:
tm.schedule()
mock_job.assert_has_calls([mock.call(j1, ig1), mock.call(j1_1, ig2)])
mock_job.assert_has_calls([mock.call(j1, ig1, []), mock.call(j1_1, ig2, [])])
assert mock_job.call_count == 2

View File

@ -17,8 +17,7 @@ def test_single_job_scheduler_launch(default_instance_group, job_template_factor
j.save()
with mocker.patch("awx.main.scheduler.TaskManager.start_task"):
TaskManager().schedule()
assert TaskManager.start_task.called
assert TaskManager.start_task.call_args == ((j, default_instance_group),)
TaskManager.start_task.assert_called_once_with(j, default_instance_group, [])
@pytest.mark.django_db
@ -34,12 +33,12 @@ def test_single_jt_multi_job_launch_blocks_last(default_instance_group, job_temp
j2.save()
with mock.patch("awx.main.scheduler.TaskManager.start_task"):
TaskManager().schedule()
TaskManager.start_task.assert_called_once_with(j1, default_instance_group)
TaskManager.start_task.assert_called_once_with(j1, default_instance_group, [])
j1.status = "successful"
j1.save()
with mocker.patch("awx.main.scheduler.TaskManager.start_task"):
TaskManager().schedule()
TaskManager.start_task.assert_called_once_with(j2, default_instance_group)
TaskManager.start_task.assert_called_once_with(j2, default_instance_group, [])
@pytest.mark.django_db
@ -60,8 +59,8 @@ def test_single_jt_multi_job_launch_allow_simul_allowed(default_instance_group,
j2.save()
with mock.patch("awx.main.scheduler.TaskManager.start_task"):
TaskManager().schedule()
TaskManager.start_task.assert_has_calls([mock.call(j1, default_instance_group),
mock.call(j2, default_instance_group)])
TaskManager.start_task.assert_has_calls([mock.call(j1, default_instance_group, []),
mock.call(j2, default_instance_group, [])])
@pytest.mark.django_db
@ -83,12 +82,12 @@ def test_multi_jt_capacity_blocking(default_instance_group, job_template_factory
mock_task_impact.return_value = 500
with mock.patch.object(TaskManager, "start_task", wraps=tm.start_task) as mock_job:
tm.schedule()
mock_job.assert_called_once_with(j1, default_instance_group)
mock_job.assert_called_once_with(j1, default_instance_group, [])
j1.status = "successful"
j1.save()
with mock.patch.object(TaskManager, "start_task", wraps=tm.start_task) as mock_job:
tm.schedule()
mock_job.assert_called_once_with(j2, default_instance_group)
mock_job.assert_called_once_with(j2, default_instance_group, [])
@ -113,12 +112,12 @@ def test_single_job_dependencies_project_launch(default_instance_group, job_temp
mock_pu.assert_called_once_with(j)
pu = [x for x in p.project_updates.all()]
assert len(pu) == 1
TaskManager.start_task.assert_called_once_with(pu[0], default_instance_group, [pu[0]])
TaskManager.start_task.assert_called_once_with(pu[0], default_instance_group, [j])
pu[0].status = "successful"
pu[0].save()
with mock.patch("awx.main.scheduler.TaskManager.start_task"):
TaskManager().schedule()
TaskManager.start_task.assert_called_once_with(j, default_instance_group)
TaskManager.start_task.assert_called_once_with(j, default_instance_group, [])
@pytest.mark.django_db
@ -143,12 +142,12 @@ def test_single_job_dependencies_inventory_update_launch(default_instance_group,
mock_iu.assert_called_once_with(j, ii)
iu = [x for x in ii.inventory_updates.all()]
assert len(iu) == 1
TaskManager.start_task.assert_called_once_with(iu[0], default_instance_group, [iu[0]])
TaskManager.start_task.assert_called_once_with(iu[0], default_instance_group, [j])
iu[0].status = "successful"
iu[0].save()
with mock.patch("awx.main.scheduler.TaskManager.start_task"):
TaskManager().schedule()
TaskManager.start_task.assert_called_once_with(j, default_instance_group)
TaskManager.start_task.assert_called_once_with(j, default_instance_group, [])
@pytest.mark.django_db
@ -181,8 +180,8 @@ def test_shared_dependencies_launch(default_instance_group, job_template_factory
TaskManager().schedule()
pu = p.project_updates.first()
iu = ii.inventory_updates.first()
TaskManager.start_task.assert_has_calls([mock.call(pu, default_instance_group, [pu, iu]),
mock.call(iu, default_instance_group, [pu, iu])])
TaskManager.start_task.assert_has_calls([mock.call(pu, default_instance_group, [iu, j1]),
mock.call(iu, default_instance_group, [pu, j1])])
pu.status = "successful"
pu.finished = pu.created + timedelta(seconds=1)
pu.save()
@ -191,12 +190,12 @@ def test_shared_dependencies_launch(default_instance_group, job_template_factory
iu.save()
with mock.patch("awx.main.scheduler.TaskManager.start_task"):
TaskManager().schedule()
TaskManager.start_task.assert_called_once_with(j1, default_instance_group)
TaskManager.start_task.assert_called_once_with(j1, default_instance_group, [])
j1.status = "successful"
j1.save()
with mock.patch("awx.main.scheduler.TaskManager.start_task"):
TaskManager().schedule()
TaskManager.start_task.assert_called_once_with(j2, default_instance_group)
TaskManager.start_task.assert_called_once_with(j2, default_instance_group, [])
pu = [x for x in p.project_updates.all()]
iu = [x for x in ii.inventory_updates.all()]
assert len(pu) == 1

View File

@ -7,33 +7,23 @@ from django.core.exceptions import ValidationError
from awx.main.models import (
UnifiedJob,
InventoryUpdate,
Job,
Inventory,
Credential,
CredentialType,
)
@pytest.fixture
def dependent_job(mocker):
j = Job(id=3, name='I_am_a_job')
j.cancel = mocker.MagicMock(return_value=True)
return [j]
def test_cancel(mocker, dependent_job):
def test_cancel(mocker):
with mock.patch.object(UnifiedJob, 'cancel', return_value=True) as parent_cancel:
iu = InventoryUpdate()
iu.get_dependent_jobs = mocker.MagicMock(return_value=dependent_job)
iu.save = mocker.MagicMock()
build_job_explanation_mock = mocker.MagicMock()
iu._build_job_explanation = mocker.MagicMock(return_value=build_job_explanation_mock)
iu.cancel()
parent_cancel.assert_called_with(job_explanation=None)
dependent_job[0].cancel.assert_called_with(job_explanation=build_job_explanation_mock)
parent_cancel.assert_called_with(is_chain=False, job_explanation=None)
def test__build_job_explanation():