1
0
mirror of https://github.com/ansible/awx.git synced 2024-10-31 15:21:13 +03:00

Merge pull request #285 from chrismeyersfsu/fix-reap_workflow_jobs

do not include workflow jobs for reaping
This commit is contained in:
Chris Meyers 2017-08-16 09:56:39 -04:00 committed by GitHub
commit 2bccb5e753
3 changed files with 16 additions and 7 deletions

View File

@ -14,6 +14,7 @@ from django.db import transaction, connection, DatabaseError
from django.utils.translation import ugettext_lazy as _
from django.utils.timezone import now as tz_now, utc
from django.db.models import Q
from django.contrib.contenttypes.models import ContentType
# AWX
from awx.main.models import * # noqa
@ -78,8 +79,10 @@ class TaskManager():
def get_running_tasks(self):
execution_nodes = {}
now = tz_now()
jobs = UnifiedJob.objects.filter(Q(status='running') |
Q(status='waiting', modified__lte=now - timedelta(seconds=60)))
workflow_ctype_id = ContentType.objects.get_for_model(WorkflowJob).id
jobs = UnifiedJob.objects.filter((Q(status='running') |
Q(status='waiting', modified__lte=now - timedelta(seconds=60))) &
~Q(polymorphic_ctype_id=workflow_ctype_id))
[execution_nodes.setdefault(j.execution_node, [j]).append(j) for j in jobs]
return execution_nodes
@ -445,7 +448,8 @@ class TaskManager():
continue
except Instance.DoesNotExist:
logger.error("Execution node Instance {} not found in database. "
"The node is currently executing jobs {}".format(node, [str(j) for j in node_jobs]))
"The node is currently executing jobs {}".format(node,
[j.log_format for j in node_jobs]))
active_tasks = []
for task in node_jobs:
if (task.celery_task_id not in active_tasks and not hasattr(settings, 'IGNORE_CELERY_INSPECTOR')):

View File

@ -9,6 +9,7 @@ from awx.main.scheduler import TaskManager
from awx.main.models import (
Job,
Instance,
WorkflowJob,
)
@ -250,7 +251,9 @@ class TestReaper():
j11 = Job.objects.create(status='running', celery_task_id='host4_j11', execution_node='host4_offline')
js = [j1, j2, j3, j4, j5, j6, j7, j8, j9, j10, j11]
j12 = WorkflowJob.objects.create(status='running', celery_task_id='workflow_job', execution_node='host1')
js = [j1, j2, j3, j4, j5, j6, j7, j8, j9, j10, j11, j12]
for j in js:
j.save = mocker.Mock(wraps=j.save)
j.websocket_emit_status = mocker.Mock()
@ -263,7 +266,7 @@ class TestReaper():
@pytest.fixture
def running_tasks(self, all_jobs):
return {
'host1': all_jobs[2:5],
'host1': all_jobs[2:5] + [all_jobs[11]],
'host2': all_jobs[5:8],
'host3_split': all_jobs[8:10],
'host4_offline': [all_jobs[10]],
@ -331,3 +334,5 @@ class TestReaper():
assert all_jobs[9] in execution_nodes_jobs['host3_split']
assert all_jobs[10] in execution_nodes_jobs['host4_offline']
assert all_jobs[11] not in execution_nodes_jobs['host1']

View File

@ -31,8 +31,8 @@ class TestCleanupInconsistentCeleryTasks():
assert "mocked" in str(excinfo.value)
logger_mock.error.assert_called_once_with("Execution node Instance host1 not found in database. "
"The node is currently executing jobs ['None-2-new', "
"'None-3-new']")
"The node is currently executing jobs ['job 2 (new)', "
"'job 3 (new)']")
@mock.patch.object(cache, 'get', return_value=None)
@mock.patch.object(TaskManager, 'get_active_tasks', return_value=([], {'host1': []}))