diff --git a/awx/main/models/inventory.py b/awx/main/models/inventory.py index e3b71ee594..c3c587dd91 100644 --- a/awx/main/models/inventory.py +++ b/awx/main/models/inventory.py @@ -29,8 +29,7 @@ from awx.main.fields import ( from awx.main.managers import HostManager from awx.main.models.base import * # noqa from awx.main.models.unified_jobs import * # noqa -from awx.main.models.jobs import Job -from awx.main.models.mixins import ResourceMixin +from awx.main.models.mixins import ResourceMixin, TaskManagerInventoryUpdateMixin from awx.main.models.notifications import ( NotificationTemplate, JobNotificationMixin, @@ -1391,7 +1390,7 @@ class InventorySource(UnifiedJobTemplate, InventorySourceOptions): return source -class InventoryUpdate(UnifiedJob, InventorySourceOptions, JobNotificationMixin): +class InventoryUpdate(UnifiedJob, InventorySourceOptions, JobNotificationMixin, TaskManagerInventoryUpdateMixin): ''' Internal job for tracking inventory updates from external sources. ''' @@ -1508,20 +1507,9 @@ class InventoryUpdate(UnifiedJob, InventorySourceOptions, JobNotificationMixin): return self.global_instance_groups return selected_groups - def _build_job_explanation(self): - if not self.job_explanation: - return 'Previous Task Canceled: {"job_type": "%s", "job_name": "%s", "job_id": "%s"}' % \ - (self.model_to_str(), self.name, self.id) - return None - - def get_dependent_jobs(self): - return Job.objects.filter(dependent_jobs__in=[self.id]) - - def cancel(self, job_explanation=None): - - res = super(InventoryUpdate, self).cancel(job_explanation=job_explanation) + def cancel(self, job_explanation=None, is_chain=False): + res = super(InventoryUpdate, self).cancel(job_explanation=job_explanation, is_chain=is_chain) if res: - map(lambda x: x.cancel(job_explanation=self._build_job_explanation()), self.get_dependent_jobs()) if self.launch_type != 'scm' and self.source_project_update: self.source_project_update.cancel(job_explanation=job_explanation) return res diff --git a/awx/main/models/jobs.py b/awx/main/models/jobs.py index a32cb1dae8..7a95252c54 100644 --- a/awx/main/models/jobs.py +++ b/awx/main/models/jobs.py @@ -38,7 +38,7 @@ from awx.main.utils import ( parse_yaml_or_json, ) from awx.main.fields import ImplicitRoleField -from awx.main.models.mixins import ResourceMixin, SurveyJobTemplateMixin, SurveyJobMixin +from awx.main.models.mixins import ResourceMixin, SurveyJobTemplateMixin, SurveyJobMixin, TaskManagerJobMixin from awx.main.models.base import PERM_INVENTORY_SCAN from awx.main.fields import JSONField @@ -449,7 +449,7 @@ class JobTemplate(UnifiedJobTemplate, JobOptions, SurveyJobTemplateMixin, Resour return dict(error=list(error_notification_templates), success=list(success_notification_templates), any=list(any_notification_templates)) -class Job(UnifiedJob, JobOptions, SurveyJobMixin, JobNotificationMixin): +class Job(UnifiedJob, JobOptions, SurveyJobMixin, JobNotificationMixin, TaskManagerJobMixin): ''' A job applies a project (with playbook) to an inventory source with a given credential. It represents a single invocation of ansible-playbook with the @@ -709,16 +709,6 @@ class Job(UnifiedJob, JobOptions, SurveyJobMixin, JobNotificationMixin): def get_notification_friendly_name(self): return "Job" - ''' - Canceling a job also cancels the implicit project update with launch_type - run. - ''' - def cancel(self, job_explanation=None): - res = super(Job, self).cancel(job_explanation=job_explanation) - if self.project_update: - self.project_update.cancel(job_explanation=job_explanation) - return res - @property def memcached_fact_key(self): return '{}'.format(self.inventory.id) diff --git a/awx/main/models/mixins.py b/awx/main/models/mixins.py index f767586eff..5ec4a4337d 100644 --- a/awx/main/models/mixins.py +++ b/awx/main/models/mixins.py @@ -16,7 +16,9 @@ from awx.main.utils import parse_yaml_or_json from awx.main.fields import JSONField -__all__ = ['ResourceMixin', 'SurveyJobTemplateMixin', 'SurveyJobMixin'] +__all__ = ['ResourceMixin', 'SurveyJobTemplateMixin', 'SurveyJobMixin', + 'TaskManagerUnifiedJobMixin', 'TaskManagerJobMixin', 'TaskManagerProjectUpdateMixin', + 'TaskManagerInventoryUpdateMixin',] class ResourceMixin(models.Model): @@ -249,3 +251,45 @@ class SurveyJobMixin(models.Model): return json.dumps(extra_vars) else: return self.extra_vars + + +class TaskManagerUnifiedJobMixin(models.Model): + class Meta: + abstract = True + + def get_jobs_fail_chain(self): + return [] + + def dependent_jobs_finished(self): + return True + + +class TaskManagerJobMixin(TaskManagerUnifiedJobMixin): + class Meta: + abstract = True + + def dependent_jobs_finished(self): + for j in self.dependent_jobs.all(): + if j.status in ['pending', 'waiting', 'running']: + return False + return True + + +class TaskManagerUpdateOnLaunchMixin(TaskManagerUnifiedJobMixin): + class Meta: + abstract = True + + def get_jobs_fail_chain(self): + return list(self.dependent_jobs.all()) + + +class TaskManagerProjectUpdateMixin(TaskManagerUpdateOnLaunchMixin): + class Meta: + abstract = True + + +class TaskManagerInventoryUpdateMixin(TaskManagerUpdateOnLaunchMixin): + class Meta: + abstract = True + + diff --git a/awx/main/models/projects.py b/awx/main/models/projects.py index bc0d4d16ed..827b131aed 100644 --- a/awx/main/models/projects.py +++ b/awx/main/models/projects.py @@ -23,7 +23,7 @@ from awx.main.models.notifications import ( JobNotificationMixin, ) from awx.main.models.unified_jobs import * # noqa -from awx.main.models.mixins import ResourceMixin +from awx.main.models.mixins import ResourceMixin, TaskManagerProjectUpdateMixin from awx.main.utils import update_scm_url from awx.main.utils.ansible import skip_directory, could_be_inventory, could_be_playbook from awx.main.fields import ImplicitRoleField @@ -430,7 +430,7 @@ class Project(UnifiedJobTemplate, ProjectOptions, ResourceMixin): return reverse('api:project_detail', kwargs={'pk': self.pk}, request=request) -class ProjectUpdate(UnifiedJob, ProjectOptions, JobNotificationMixin): +class ProjectUpdate(UnifiedJob, ProjectOptions, JobNotificationMixin, TaskManagerProjectUpdateMixin): ''' Internal job for tracking project updates from SCM. ''' @@ -512,8 +512,8 @@ class ProjectUpdate(UnifiedJob, ProjectOptions, JobNotificationMixin): update_fields.append('scm_delete_on_next_update') parent_instance.save(update_fields=update_fields) - def cancel(self, job_explanation=None): - res = super(ProjectUpdate, self).cancel(job_explanation=job_explanation) + def cancel(self, job_explanation=None, is_chain=False): + res = super(ProjectUpdate, self).cancel(job_explanation=job_explanation, is_chain=is_chain) if res and self.launch_type != 'sync': for inv_src in self.scm_inventory_updates.filter(status='running'): inv_src.cancel(job_explanation='Source project update `{}` was canceled.'.format(self.name)) diff --git a/awx/main/models/unified_jobs.py b/awx/main/models/unified_jobs.py index a763239d15..dadb9d2b65 100644 --- a/awx/main/models/unified_jobs.py +++ b/awx/main/models/unified_jobs.py @@ -30,7 +30,7 @@ from djcelery.models import TaskMeta # AWX from awx.main.models.base import * # noqa from awx.main.models.schedules import Schedule -from awx.main.models.mixins import ResourceMixin +from awx.main.models.mixins import ResourceMixin, TaskManagerUnifiedJobMixin from awx.main.utils import ( decrypt_field, _inventory_updates, copy_model_by_class, copy_m2m_relationships @@ -414,7 +414,7 @@ class UnifiedJobTypeStringMixin(object): return UnifiedJobTypeStringMixin._camel_to_underscore(self.__class__.__name__) -class UnifiedJob(PolymorphicModel, PasswordFieldsModel, CommonModelNameNotUnique, UnifiedJobTypeStringMixin): +class UnifiedJob(PolymorphicModel, PasswordFieldsModel, CommonModelNameNotUnique, UnifiedJobTypeStringMixin, TaskManagerUnifiedJobMixin): ''' Concrete base class for unified job run by the task engine. ''' @@ -1058,8 +1058,17 @@ class UnifiedJob(PolymorphicModel, PasswordFieldsModel, CommonModelNameNotUnique if settings.DEBUG: raise - def cancel(self, job_explanation=None): + def _build_job_explanation(self): + if not self.job_explanation: + return 'Previous Task Canceled: {"job_type": "%s", "job_name": "%s", "job_id": "%s"}' % \ + (self.model_to_str(), self.name, self.id) + return None + + def cancel(self, job_explanation=None, is_chain=False): if self.can_cancel: + if not is_chain: + map(lambda x: x.cancel(job_explanation=self._build_job_explanation(), is_chain=True), self.get_jobs_fail_chain()) + if not self.cancel_flag: self.cancel_flag = True cancel_fields = ['cancel_flag'] diff --git a/awx/main/scheduler/__init__.py b/awx/main/scheduler/__init__.py index 8e0e1fc340..3e83c759d8 100644 --- a/awx/main/scheduler/__init__.py +++ b/awx/main/scheduler/__init__.py @@ -47,6 +47,10 @@ class TaskManager(): for g in self.graph: if self.graph[g]['graph'].is_job_blocked(task): return True + + if not task.dependent_jobs_finished(): + return True + return False def get_tasks(self, status_list=('pending', 'waiting', 'running')): @@ -262,11 +266,12 @@ class TaskManager(): return inventory_task def capture_chain_failure_dependencies(self, task, dependencies): - for dep in dependencies: - with disable_activity_stream(): - logger.info('Adding unified job {} to list of dependencies of {}.'.format(task.id, dep.id)) - dep.dependent_jobs.add(task.id) - dep.save() + with disable_activity_stream(): + task.dependent_jobs.add(*dependencies) + + for dep in dependencies: + # Add task + all deps except self + dep.dependent_jobs.add(*([task] + filter(lambda d: d != dep, dependencies))) def should_update_inventory_source(self, job, inventory_source): now = tz_now() @@ -342,7 +347,9 @@ class TaskManager(): if self.should_update_inventory_source(task, inventory_source): inventory_task = self.create_inventory_update(task, inventory_source) dependencies.append(inventory_task) - self.capture_chain_failure_dependencies(task, dependencies) + + if len(dependencies) > 0: + self.capture_chain_failure_dependencies(task, dependencies) return dependencies def process_dependencies(self, dependent_task, dependency_tasks): @@ -359,7 +366,9 @@ class TaskManager(): if not self.would_exceed_capacity(task, rampart_group.name): logger.debug("Starting dependent task {} in group {}".format(task, rampart_group.name)) self.graph[rampart_group.name]['graph'].add_job(task) - self.start_task(task, rampart_group, dependency_tasks) + tasks_to_fail = filter(lambda t: t != task, dependency_tasks) + tasks_to_fail += [dependent_task] + self.start_task(task, rampart_group, tasks_to_fail) found_acceptable_queue = True if not found_acceptable_queue: logger.debug("Dependent task {} couldn't be scheduled on graph, waiting for next cycle".format(task)) @@ -379,7 +388,7 @@ class TaskManager(): if not self.would_exceed_capacity(task, rampart_group.name): logger.debug("Starting task {} in group {}".format(task, rampart_group.name)) self.graph[rampart_group.name]['graph'].add_job(task) - self.start_task(task, rampart_group) + self.start_task(task, rampart_group, task.get_jobs_fail_chain()) found_acceptable_queue = True break if not found_acceptable_queue: diff --git a/awx/main/tasks.py b/awx/main/tasks.py index 5b02e694fb..99d1e86af6 100644 --- a/awx/main/tasks.py +++ b/awx/main/tasks.py @@ -314,7 +314,7 @@ def handle_work_error(self, task_id, subtasks=None): first_instance = instance first_instance_type = each_task['type'] - if instance.celery_task_id != task_id: + if instance.celery_task_id != task_id and not instance.cancel_flag: instance.status = 'failed' instance.failed = True if not instance.job_explanation: @@ -1398,11 +1398,12 @@ class RunProjectUpdate(BaseTask): def get_stdout_handle(self, instance): stdout_handle = super(RunProjectUpdate, self).get_stdout_handle(instance) + pk = instance.pk def raw_callback(data): - instance_actual = ProjectUpdate.objects.get(pk=instance.pk) - instance_actual.result_stdout_text += data - instance_actual.save() + instance_actual = self.update_model(pk) + result_stdout_text = instance_actual.result_stdout_text + data + self.update_model(pk, result_stdout_text=result_stdout_text) return OutputEventFilter(stdout_handle, raw_callback=raw_callback) def _update_dependent_inventories(self, project_update, dependent_inventory_sources): @@ -1872,11 +1873,12 @@ class RunInventoryUpdate(BaseTask): def get_stdout_handle(self, instance): stdout_handle = super(RunInventoryUpdate, self).get_stdout_handle(instance) + pk = instance.pk def raw_callback(data): - instance_actual = InventoryUpdate.objects.get(pk=instance.pk) - instance_actual.result_stdout_text += data - instance_actual.save() + instance_actual = self.update_model(pk) + result_stdout_text = instance_actual.result_stdout_text + data + self.update_model(pk, result_stdout_text=result_stdout_text) return OutputEventFilter(stdout_handle, raw_callback=raw_callback) def build_cwd(self, inventory_update, **kwargs): @@ -2138,11 +2140,12 @@ class RunSystemJob(BaseTask): def get_stdout_handle(self, instance): stdout_handle = super(RunSystemJob, self).get_stdout_handle(instance) + pk = instance.pk def raw_callback(data): - instance_actual = SystemJob.objects.get(pk=instance.pk) - instance_actual.result_stdout_text += data - instance_actual.save() + instance_actual = self.update_model(pk) + result_stdout_text = instance_actual.result_stdout_text + data + self.update_model(pk, result_stdout_text=result_stdout_text) return OutputEventFilter(stdout_handle, raw_callback=raw_callback) def build_env(self, instance, **kwargs): diff --git a/awx/main/tests/functional/task_management/test_rampart_groups.py b/awx/main/tests/functional/task_management/test_rampart_groups.py index 3b5622d7fd..c81556e091 100644 --- a/awx/main/tests/functional/task_management/test_rampart_groups.py +++ b/awx/main/tests/functional/task_management/test_rampart_groups.py @@ -29,7 +29,7 @@ def test_multi_group_basic_job_launch(instance_factory, default_instance_group, mock_task_impact.return_value = 500 with mocker.patch("awx.main.scheduler.TaskManager.start_task"): TaskManager().schedule() - TaskManager.start_task.assert_has_calls([mock.call(j1, ig1), mock.call(j2, ig2)]) + TaskManager.start_task.assert_has_calls([mock.call(j1, ig1, []), mock.call(j2, ig2, [])]) @@ -63,13 +63,26 @@ def test_multi_group_with_shared_dependency(instance_factory, default_instance_g with mocker.patch("awx.main.scheduler.TaskManager.start_task"): TaskManager().schedule() pu = p.project_updates.first() - TaskManager.start_task.assert_called_once_with(pu, default_instance_group, [pu]) + TaskManager.start_task.assert_called_once_with(pu, default_instance_group, [j1]) pu.finished = pu.created + timedelta(seconds=1) pu.status = "successful" pu.save() with mock.patch("awx.main.scheduler.TaskManager.start_task"): TaskManager().schedule() - TaskManager.start_task.assert_has_calls([mock.call(j1, ig1), mock.call(j2, ig2)]) + TaskManager.start_task.assert_called_once_with(j1, ig1, []) + j1.finished = j1.created + timedelta(seconds=2) + j1.status = "successful" + j1.save() + with mock.patch("awx.main.scheduler.TaskManager.start_task"): + TaskManager().schedule() + pu = p.project_updates.last() + TaskManager.start_task.assert_called_once_with(pu, default_instance_group, [j2]) + pu.finished = pu.created + timedelta(seconds=1) + pu.status = "successful" + pu.save() + with mock.patch("awx.main.scheduler.TaskManager.start_task"): + TaskManager().schedule() + TaskManager.start_task.assert_called_once_with(j2, ig2, []) @pytest.mark.django_db @@ -114,8 +127,8 @@ def test_overcapacity_blocking_other_groups_unaffected(instance_factory, default mock_task_impact.return_value = 500 with mock.patch.object(TaskManager, "start_task", wraps=tm.start_task) as mock_job: tm.schedule() - mock_job.assert_has_calls([mock.call(j1, ig1), mock.call(j1_1, ig1), - mock.call(j2, ig2)]) + mock_job.assert_has_calls([mock.call(j1, ig1, []), mock.call(j1_1, ig1, []), + mock.call(j2, ig2, [])]) assert mock_job.call_count == 3 @@ -146,5 +159,5 @@ def test_failover_group_run(instance_factory, default_instance_group, mocker, mock_task_impact.return_value = 500 with mock.patch.object(TaskManager, "start_task", wraps=tm.start_task) as mock_job: tm.schedule() - mock_job.assert_has_calls([mock.call(j1, ig1), mock.call(j1_1, ig2)]) + mock_job.assert_has_calls([mock.call(j1, ig1, []), mock.call(j1_1, ig2, [])]) assert mock_job.call_count == 2 diff --git a/awx/main/tests/functional/task_management/test_scheduler.py b/awx/main/tests/functional/task_management/test_scheduler.py index 15646dfe54..cb1d689577 100644 --- a/awx/main/tests/functional/task_management/test_scheduler.py +++ b/awx/main/tests/functional/task_management/test_scheduler.py @@ -17,8 +17,7 @@ def test_single_job_scheduler_launch(default_instance_group, job_template_factor j.save() with mocker.patch("awx.main.scheduler.TaskManager.start_task"): TaskManager().schedule() - assert TaskManager.start_task.called - assert TaskManager.start_task.call_args == ((j, default_instance_group),) + TaskManager.start_task.assert_called_once_with(j, default_instance_group, []) @pytest.mark.django_db @@ -34,12 +33,12 @@ def test_single_jt_multi_job_launch_blocks_last(default_instance_group, job_temp j2.save() with mock.patch("awx.main.scheduler.TaskManager.start_task"): TaskManager().schedule() - TaskManager.start_task.assert_called_once_with(j1, default_instance_group) + TaskManager.start_task.assert_called_once_with(j1, default_instance_group, []) j1.status = "successful" j1.save() with mocker.patch("awx.main.scheduler.TaskManager.start_task"): TaskManager().schedule() - TaskManager.start_task.assert_called_once_with(j2, default_instance_group) + TaskManager.start_task.assert_called_once_with(j2, default_instance_group, []) @pytest.mark.django_db @@ -60,8 +59,8 @@ def test_single_jt_multi_job_launch_allow_simul_allowed(default_instance_group, j2.save() with mock.patch("awx.main.scheduler.TaskManager.start_task"): TaskManager().schedule() - TaskManager.start_task.assert_has_calls([mock.call(j1, default_instance_group), - mock.call(j2, default_instance_group)]) + TaskManager.start_task.assert_has_calls([mock.call(j1, default_instance_group, []), + mock.call(j2, default_instance_group, [])]) @pytest.mark.django_db @@ -83,12 +82,12 @@ def test_multi_jt_capacity_blocking(default_instance_group, job_template_factory mock_task_impact.return_value = 500 with mock.patch.object(TaskManager, "start_task", wraps=tm.start_task) as mock_job: tm.schedule() - mock_job.assert_called_once_with(j1, default_instance_group) + mock_job.assert_called_once_with(j1, default_instance_group, []) j1.status = "successful" j1.save() with mock.patch.object(TaskManager, "start_task", wraps=tm.start_task) as mock_job: tm.schedule() - mock_job.assert_called_once_with(j2, default_instance_group) + mock_job.assert_called_once_with(j2, default_instance_group, []) @@ -113,12 +112,12 @@ def test_single_job_dependencies_project_launch(default_instance_group, job_temp mock_pu.assert_called_once_with(j) pu = [x for x in p.project_updates.all()] assert len(pu) == 1 - TaskManager.start_task.assert_called_once_with(pu[0], default_instance_group, [pu[0]]) + TaskManager.start_task.assert_called_once_with(pu[0], default_instance_group, [j]) pu[0].status = "successful" pu[0].save() with mock.patch("awx.main.scheduler.TaskManager.start_task"): TaskManager().schedule() - TaskManager.start_task.assert_called_once_with(j, default_instance_group) + TaskManager.start_task.assert_called_once_with(j, default_instance_group, []) @pytest.mark.django_db @@ -143,12 +142,12 @@ def test_single_job_dependencies_inventory_update_launch(default_instance_group, mock_iu.assert_called_once_with(j, ii) iu = [x for x in ii.inventory_updates.all()] assert len(iu) == 1 - TaskManager.start_task.assert_called_once_with(iu[0], default_instance_group, [iu[0]]) + TaskManager.start_task.assert_called_once_with(iu[0], default_instance_group, [j]) iu[0].status = "successful" iu[0].save() with mock.patch("awx.main.scheduler.TaskManager.start_task"): TaskManager().schedule() - TaskManager.start_task.assert_called_once_with(j, default_instance_group) + TaskManager.start_task.assert_called_once_with(j, default_instance_group, []) @pytest.mark.django_db @@ -181,8 +180,8 @@ def test_shared_dependencies_launch(default_instance_group, job_template_factory TaskManager().schedule() pu = p.project_updates.first() iu = ii.inventory_updates.first() - TaskManager.start_task.assert_has_calls([mock.call(pu, default_instance_group, [pu, iu]), - mock.call(iu, default_instance_group, [pu, iu])]) + TaskManager.start_task.assert_has_calls([mock.call(pu, default_instance_group, [iu, j1]), + mock.call(iu, default_instance_group, [pu, j1])]) pu.status = "successful" pu.finished = pu.created + timedelta(seconds=1) pu.save() @@ -191,12 +190,12 @@ def test_shared_dependencies_launch(default_instance_group, job_template_factory iu.save() with mock.patch("awx.main.scheduler.TaskManager.start_task"): TaskManager().schedule() - TaskManager.start_task.assert_called_once_with(j1, default_instance_group) + TaskManager.start_task.assert_called_once_with(j1, default_instance_group, []) j1.status = "successful" j1.save() with mock.patch("awx.main.scheduler.TaskManager.start_task"): TaskManager().schedule() - TaskManager.start_task.assert_called_once_with(j2, default_instance_group) + TaskManager.start_task.assert_called_once_with(j2, default_instance_group, []) pu = [x for x in p.project_updates.all()] iu = [x for x in ii.inventory_updates.all()] assert len(pu) == 1 diff --git a/awx/main/tests/unit/models/test_inventory.py b/awx/main/tests/unit/models/test_inventory.py index 4f0d5eddd8..0fa390bceb 100644 --- a/awx/main/tests/unit/models/test_inventory.py +++ b/awx/main/tests/unit/models/test_inventory.py @@ -7,33 +7,23 @@ from django.core.exceptions import ValidationError from awx.main.models import ( UnifiedJob, InventoryUpdate, - Job, Inventory, Credential, CredentialType, ) -@pytest.fixture -def dependent_job(mocker): - j = Job(id=3, name='I_am_a_job') - j.cancel = mocker.MagicMock(return_value=True) - return [j] - - -def test_cancel(mocker, dependent_job): +def test_cancel(mocker): with mock.patch.object(UnifiedJob, 'cancel', return_value=True) as parent_cancel: iu = InventoryUpdate() - iu.get_dependent_jobs = mocker.MagicMock(return_value=dependent_job) iu.save = mocker.MagicMock() build_job_explanation_mock = mocker.MagicMock() iu._build_job_explanation = mocker.MagicMock(return_value=build_job_explanation_mock) iu.cancel() - parent_cancel.assert_called_with(job_explanation=None) - dependent_job[0].cancel.assert_called_with(job_explanation=build_job_explanation_mock) + parent_cancel.assert_called_with(is_chain=False, job_explanation=None) def test__build_job_explanation():