From de82707581bddccc0607bb3ccccc68ceff679278 Mon Sep 17 00:00:00 2001 From: Chris Meyers Date: Mon, 14 Aug 2017 15:41:33 -0400 Subject: [PATCH] fail all jobs on an offline node --- awx/main/scheduler/__init__.py | 68 +++++++++++-------- .../task_management/test_scheduler.py | 68 +++++++++++++------ awx/main/tests/unit/test_task_manager.py | 52 ++++++++++++++ 3 files changed, 141 insertions(+), 47 deletions(-) create mode 100644 awx/main/tests/unit/test_task_manager.py diff --git a/awx/main/scheduler/__init__.py b/awx/main/scheduler/__init__.py index f16633cc49..cdd9a55f3b 100644 --- a/awx/main/scheduler/__init__.py +++ b/awx/main/scheduler/__init__.py @@ -10,7 +10,7 @@ from sets import Set # Django from django.conf import settings from django.core.cache import cache -from django.db import transaction, connection +from django.db import transaction, connection, DatabaseError from django.utils.translation import ugettext_lazy as _ from django.utils.timezone import now as tz_now, utc from django.db.models import Q @@ -78,13 +78,9 @@ class TaskManager(): def get_running_tasks(self): execution_nodes = {} now = tz_now() - jobs = list(UnifiedJob.objects.filter(Q(status='running') | - (Q(status='waiting', modified__lte=now - timedelta(seconds=60))))) - for j in jobs: - if j.execution_node in execution_nodes: - execution_nodes[j.execution_node].append(j) - elif j.execution_node not in execution_nodes: - execution_nodes[j.execution_node] = [j] + jobs = UnifiedJob.objects.filter(Q(status='running') | + Q(status='waiting', modified__lte=now - timedelta(seconds=60))) + [execution_nodes.setdefault(j.execution_node, [j]).append(j) for j in jobs] return execution_nodes ''' @@ -93,17 +89,21 @@ class TaskManager(): Transform: { "celery@ec2-54-204-222-62.compute-1.amazonaws.com": [], - "celery@ec2-54-163-144-168.compute-1.amazonaws.com": [{ - ... - "id": "5238466a-f8c7-43b3-9180-5b78e9da8304", - ... - }] + "celery@ec2-54-163-144-168.compute-1.amazonaws.com": [{ + ... + "id": "5238466a-f8c7-43b3-9180-5b78e9da8304", + ... + }, { + ..., + }, ...] } to: { - "celery@ec2-54-204-222-62.compute-1.amazonaws.com": [ + "ec2-54-204-222-62.compute-1.amazonaws.com": [ "5238466a-f8c7-43b3-9180-5b78e9da8304", + "5238466a-f8c7-43b3-9180-5b78e9da8306", + ... ] } ''' @@ -123,12 +123,9 @@ class TaskManager(): active_tasks = set() map(lambda at: active_tasks.add(at['id']), active_task_queues[queue]) - # queue is of the form celery@myhost.com + # celery worker name is of the form celery@myhost.com queue_name = queue.split('@') - if len(queue_name) > 1: - queue_name = queue_name[1] - else: - queue_name = queue_name[0] + queue_name = queue_name[1 if len(queue_name) > 1 else 0] queues[queue_name] = active_tasks else: if not hasattr(settings, 'CELERY_UNIT_TEST'): @@ -431,14 +428,27 @@ class TaskManager(): Only consider failing tasks on instances for which we obtained a task list from celery for. ''' - execution_nodes_jobs = self.get_running_tasks() - for node, node_jobs in execution_nodes_jobs.iteritems(): - if node not in active_queues: - continue - active_tasks = active_queues[node] + running_tasks = self.get_running_tasks() + for node, node_jobs in running_tasks.iteritems(): + if node in active_queues: + active_tasks = active_queues[node] + else: + ''' + Node task list not found in celery. If tower thinks the node is down + then fail all the jobs on the node. + ''' + try: + instance = Instance.objects.get(hostname=node) + if instance.capacity == 0: + active_tasks = [] + else: + continue + except Instance.DoesNotExist: + logger.error("Execution node Instance {} not found in database. " + "The node is currently executing jobs {}".format(node, [str(j) for j in node_jobs])) + active_tasks = [] for task in node_jobs: if (task.celery_task_id not in active_tasks and not hasattr(settings, 'IGNORE_CELERY_INSPECTOR')): - # TODO: try catch the getting of the job. The job COULD have been deleted if isinstance(task, WorkflowJob): continue if task.modified > celery_task_start_time: @@ -448,10 +458,14 @@ class TaskManager(): 'Task was marked as running in Tower but was not present in', 'Celery, so it has been marked as failed.', )) - task.save() + try: + task.save(update_fields=['status', 'job_explanation']) + except DatabaseError: + logger.error("Task {} DB error in marking failed. Job possibly deleted.".format(task.log_format)) + continue awx_tasks._send_notification_templates(task, 'failed') task.websocket_emit_status('failed') - logger.error("Task %s appears orphaned... marking as failed" % task) + logger.error("Task {} has no record in celery. Marking as failed".format(task.log_format)) def calculate_capacity_used(self, tasks): for rampart_group in self.graph: diff --git a/awx/main/tests/functional/task_management/test_scheduler.py b/awx/main/tests/functional/task_management/test_scheduler.py index a9fd694bce..335505a65e 100644 --- a/awx/main/tests/functional/task_management/test_scheduler.py +++ b/awx/main/tests/functional/task_management/test_scheduler.py @@ -8,6 +8,7 @@ from django.utils.timezone import now as tz_now from awx.main.scheduler import TaskManager from awx.main.models import ( Job, + Instance, ) @@ -223,21 +224,33 @@ class TestReaper(): def all_jobs(self, mocker): now = tz_now() + Instance.objects.create(hostname='host1', capacity=100) + Instance.objects.create(hostname='host2', capacity=100) + Instance.objects.create(hostname='host3_split', capacity=100) + Instance.objects.create(hostname='host4_offline', capacity=0) + j1 = Job.objects.create(status='pending', execution_node='host1') j2 = Job.objects.create(status='waiting', celery_task_id='considered_j2', execution_node='host1') j3 = Job.objects.create(status='waiting', celery_task_id='considered_j3', execution_node='host1') j3.modified = now - timedelta(seconds=60) j3.save(update_fields=['modified']) j4 = Job.objects.create(status='running', celery_task_id='considered_j4', execution_node='host1') - j5 = Job.objects.create(status='waiting', celery_task_id='reapable_j5', execution_node='host2') + j5 = Job.objects.create(status='waiting', celery_task_id='reapable_j5', execution_node='host1') j5.modified = now - timedelta(seconds=60) j5.save(update_fields=['modified']) - j6 = Job.objects.create(status='waiting', celery_task_id='host2_j6', execution_node='host2_split') + j6 = Job.objects.create(status='waiting', celery_task_id='considered_j6', execution_node='host2') j6.modified = now - timedelta(seconds=60) j6.save(update_fields=['modified']) - j7 = Job.objects.create(status='running', celery_task_id='host2_j6', execution_node='host2_split') + j7 = Job.objects.create(status='running', celery_task_id='considered_j7', execution_node='host2') + j8 = Job.objects.create(status='running', celery_task_id='reapable_j7', execution_node='host2') + j9 = Job.objects.create(status='waiting', celery_task_id='host3_j8', execution_node='host3_split') + j9.modified = now - timedelta(seconds=60) + j9.save(update_fields=['modified']) + j10 = Job.objects.create(status='running', execution_node='host3_split') - js = [j1, j2, j3, j4, j5, j6, j7] + j11 = Job.objects.create(status='running', celery_task_id='host4_j11', execution_node='host4_offline') + + js = [j1, j2, j3, j4, j5, j6, j7, j8, j9, j10, j11] for j in js: j.save = mocker.Mock(wraps=j.save) j.websocket_emit_status = mocker.Mock() @@ -245,11 +258,20 @@ class TestReaper(): @pytest.fixture def considered_jobs(self, all_jobs): - return all_jobs[2:4] + [all_jobs[4]] + return all_jobs[2:7] + [all_jobs[10]] + + @pytest.fixture + def running_tasks(self, all_jobs): + return { + 'host1': all_jobs[2:5], + 'host2': all_jobs[5:8], + 'host3_split': all_jobs[8:10], + 'host4_offline': [all_jobs[10]], + } @pytest.fixture def reapable_jobs(self, all_jobs): - return [all_jobs[4]] + return [all_jobs[4], all_jobs[7], all_jobs[10]] @pytest.fixture def unconsidered_jobs(self, all_jobs): @@ -259,16 +281,16 @@ class TestReaper(): def active_tasks(self): return ([], { 'host1': ['considered_j2', 'considered_j3', 'considered_j4',], - 'host2_split': ['host2_j6', 'host2_j7'], + 'host2': ['considered_j6', 'considered_j7'], }) @pytest.mark.django_db @mock.patch('awx.main.tasks._send_notification_templates') @mock.patch.object(TaskManager, 'get_active_tasks', lambda self: ([], [])) - def test_cleanup_inconsistent_task(self, notify, active_tasks, considered_jobs, reapable_jobs, mocker): + def test_cleanup_inconsistent_task(self, notify, active_tasks, considered_jobs, reapable_jobs, running_tasks, mocker): tm = TaskManager() - #tm.get_running_tasks = mocker.Mock(return_value=considered_jobs) + tm.get_running_tasks = mocker.Mock(return_value=running_tasks) tm.get_active_tasks = mocker.Mock(return_value=active_tasks) tm.cleanup_inconsistent_celery_tasks() @@ -277,15 +299,16 @@ class TestReaper(): if j not in reapable_jobs: j.save.assert_not_called() - for reaped_job in reapable_jobs: - notify.assert_called_once_with(reaped_job, 'failed') - reaped_job.websocket_emit_status.assert_called_once_with('failed') - assert reaped_job.status == 'failed' - assert reaped_job.job_explanation == ( + assert notify.call_count == 3 + notify.assert_has_calls([mock.call(j, 'failed') for j in reapable_jobs], any_order=True) + + for j in reapable_jobs: + j.websocket_emit_status.assert_called_once_with('failed') + assert j.status == 'failed' + assert j.job_explanation == ( 'Task was marked as running in Tower but was not present in Celery, so it has been marked as failed.' ) - @pytest.mark.django_db def test_get_running_tasks(self, all_jobs): tm = TaskManager() @@ -293,13 +316,18 @@ class TestReaper(): # Ensure the query grabs the expected jobs execution_nodes_jobs = tm.get_running_tasks() assert 'host1' in execution_nodes_jobs - assert 'host2_split' in execution_nodes_jobs + assert 'host2' in execution_nodes_jobs + assert 'host3_split' in execution_nodes_jobs - - assert all_jobs[1] in execution_nodes_jobs['host1'] assert all_jobs[2] in execution_nodes_jobs['host1'] assert all_jobs[3] in execution_nodes_jobs['host1'] assert all_jobs[4] in execution_nodes_jobs['host1'] - assert all_jobs[5] in execution_nodes_jobs['host2_split'] - assert all_jobs[6] in execution_nodes_jobs['host2_split'] + assert all_jobs[5] in execution_nodes_jobs['host2'] + assert all_jobs[6] in execution_nodes_jobs['host2'] + assert all_jobs[7] in execution_nodes_jobs['host2'] + + assert all_jobs[8] in execution_nodes_jobs['host3_split'] + assert all_jobs[9] in execution_nodes_jobs['host3_split'] + + assert all_jobs[10] in execution_nodes_jobs['host4_offline'] diff --git a/awx/main/tests/unit/test_task_manager.py b/awx/main/tests/unit/test_task_manager.py new file mode 100644 index 0000000000..fc0be720c8 --- /dev/null +++ b/awx/main/tests/unit/test_task_manager.py @@ -0,0 +1,52 @@ +# Copyright (c) 2017 Ansible by Red Hat +# All Rights Reserved. + +import mock +import pytest + +from django.utils.timezone import now as tz_now +from django.db import DatabaseError + +from awx.main.scheduler import TaskManager +from awx.main.models import ( + Job, + Instance, + InstanceGroup, +) +from django.core.cache import cache + + +class TestCleanupInconsistentCeleryTasks(): + @mock.patch.object(cache, 'get', return_value=None) + @mock.patch.object(TaskManager, 'get_active_tasks', return_value=([], {})) + @mock.patch.object(TaskManager, 'get_running_tasks', return_value={'host1': [Job(id=2), Job(id=3),]}) + @mock.patch.object(InstanceGroup.objects, 'all', return_value=[]) + @mock.patch.object(Instance.objects, 'get', side_effect=Instance.DoesNotExist) + @mock.patch('awx.main.scheduler.logger') + def test_instance_does_not_exist(self, logger_mock, *args): + logger_mock.error = mock.MagicMock(side_effect=RuntimeError("mocked")) + tm = TaskManager() + with pytest.raises(RuntimeError) as excinfo: + tm.cleanup_inconsistent_celery_tasks() + + assert "mocked" in str(excinfo.value) + logger_mock.error.assert_called_once_with("Execution node Instance host1 not found in database. " + "The node is currently executing jobs ['None-2-new', " + "'None-3-new']") + + @mock.patch.object(cache, 'get', return_value=None) + @mock.patch.object(TaskManager, 'get_active_tasks', return_value=([], {'host1': []})) + @mock.patch.object(InstanceGroup.objects, 'all', return_value=[]) + @mock.patch.object(TaskManager, 'get_running_tasks') + @mock.patch('awx.main.scheduler.logger') + def test_save_failed(self, logger_mock, get_running_tasks, *args): + logger_mock.error = mock.MagicMock() + job = Job(id=2, modified=tz_now(), status='running', celery_task_id='blah', execution_node='host1') + job.websocket_emit_status = mock.MagicMock() + get_running_tasks.return_value = {'host1': [job]} + tm = TaskManager() + + with mock.patch.object(job, 'save', side_effect=DatabaseError): + tm.cleanup_inconsistent_celery_tasks() + job.save.assert_called_once() + logger_mock.error.assert_called_once_with("Task job 2 (failed) DB error in marking failed. Job possibly deleted.")