mirror of
https://github.com/ansible/awx.git
synced 2024-10-31 15:21:13 +03:00
do not include workflow jobs for reaping
* Workflow jobs are virtual jobs that don't actually run. Thus they won't have a celery id and aren't candidates for the generic reaping. * Better error logging when Instance not found in reaping code.
This commit is contained in:
parent
32fea6ef19
commit
d615e2e9ff
@ -14,6 +14,7 @@ from django.db import transaction, connection, DatabaseError
|
||||
from django.utils.translation import ugettext_lazy as _
|
||||
from django.utils.timezone import now as tz_now, utc
|
||||
from django.db.models import Q
|
||||
from django.contrib.contenttypes.models import ContentType
|
||||
|
||||
# AWX
|
||||
from awx.main.models import * # noqa
|
||||
@ -78,8 +79,10 @@ class TaskManager():
|
||||
def get_running_tasks(self):
|
||||
execution_nodes = {}
|
||||
now = tz_now()
|
||||
jobs = UnifiedJob.objects.filter(Q(status='running') |
|
||||
Q(status='waiting', modified__lte=now - timedelta(seconds=60)))
|
||||
workflow_ctype_id = ContentType.objects.get_for_model(WorkflowJob).id
|
||||
jobs = UnifiedJob.objects.filter((Q(status='running') |
|
||||
Q(status='waiting', modified__lte=now - timedelta(seconds=60))) &
|
||||
~Q(polymorphic_ctype_id=workflow_ctype_id))
|
||||
[execution_nodes.setdefault(j.execution_node, [j]).append(j) for j in jobs]
|
||||
return execution_nodes
|
||||
|
||||
@ -445,7 +448,8 @@ class TaskManager():
|
||||
continue
|
||||
except Instance.DoesNotExist:
|
||||
logger.error("Execution node Instance {} not found in database. "
|
||||
"The node is currently executing jobs {}".format(node, [str(j) for j in node_jobs]))
|
||||
"The node is currently executing jobs {}".format(node,
|
||||
[j.log_format for j in node_jobs]))
|
||||
active_tasks = []
|
||||
for task in node_jobs:
|
||||
if (task.celery_task_id not in active_tasks and not hasattr(settings, 'IGNORE_CELERY_INSPECTOR')):
|
||||
|
@ -9,6 +9,7 @@ from awx.main.scheduler import TaskManager
|
||||
from awx.main.models import (
|
||||
Job,
|
||||
Instance,
|
||||
WorkflowJob,
|
||||
)
|
||||
|
||||
|
||||
@ -250,7 +251,9 @@ class TestReaper():
|
||||
|
||||
j11 = Job.objects.create(status='running', celery_task_id='host4_j11', execution_node='host4_offline')
|
||||
|
||||
js = [j1, j2, j3, j4, j5, j6, j7, j8, j9, j10, j11]
|
||||
j12 = WorkflowJob.objects.create(status='running', celery_task_id='workflow_job', execution_node='host1')
|
||||
|
||||
js = [j1, j2, j3, j4, j5, j6, j7, j8, j9, j10, j11, j12]
|
||||
for j in js:
|
||||
j.save = mocker.Mock(wraps=j.save)
|
||||
j.websocket_emit_status = mocker.Mock()
|
||||
@ -263,7 +266,7 @@ class TestReaper():
|
||||
@pytest.fixture
|
||||
def running_tasks(self, all_jobs):
|
||||
return {
|
||||
'host1': all_jobs[2:5],
|
||||
'host1': all_jobs[2:5] + [all_jobs[11]],
|
||||
'host2': all_jobs[5:8],
|
||||
'host3_split': all_jobs[8:10],
|
||||
'host4_offline': [all_jobs[10]],
|
||||
@ -331,3 +334,5 @@ class TestReaper():
|
||||
assert all_jobs[9] in execution_nodes_jobs['host3_split']
|
||||
|
||||
assert all_jobs[10] in execution_nodes_jobs['host4_offline']
|
||||
|
||||
assert all_jobs[11] not in execution_nodes_jobs['host1']
|
||||
|
@ -31,8 +31,8 @@ class TestCleanupInconsistentCeleryTasks():
|
||||
|
||||
assert "mocked" in str(excinfo.value)
|
||||
logger_mock.error.assert_called_once_with("Execution node Instance host1 not found in database. "
|
||||
"The node is currently executing jobs ['None-2-new', "
|
||||
"'None-3-new']")
|
||||
"The node is currently executing jobs ['job 2 (new)', "
|
||||
"'job 3 (new)']")
|
||||
|
||||
@mock.patch.object(cache, 'get', return_value=None)
|
||||
@mock.patch.object(TaskManager, 'get_active_tasks', return_value=([], {'host1': []}))
|
||||
|
Loading…
Reference in New Issue
Block a user