1
0
mirror of https://github.com/ansible/awx.git synced 2024-11-02 09:51:09 +03:00

task manager using messages

* First pass, adapt singleton task manager to process messages and run
jobs based on events instead of a busy loop.
* Still need to make message handing run in celery, not in a consumption
loop
This commit is contained in:
Chris Meyers 2016-09-20 10:14:38 -04:00
parent 609a3e6f2f
commit cc90204b0f
8 changed files with 386 additions and 250 deletions

View File

@ -2290,6 +2290,7 @@ class JobTemplateLaunch(RetrieveAPIView, GenericAPIView):
new_job = obj.create_unified_job(**kv)
result = new_job.signal_start(**kv)
if not result:
data = dict(passwords_needed_to_start=new_job.passwords_needed_to_start)
new_job.delete()

View File

@ -7,6 +7,10 @@ import datetime
import logging
import signal
import time
import traceback
from kombu import Connection, Exchange, Queue, Producer
from kombu.mixins import ConsumerMixin
# Django
from django.conf import settings
@ -17,6 +21,8 @@ from awx.main.models import * # noqa
from awx.main.queue import FifoQueue
from awx.main.tasks import handle_work_error, handle_work_success
from awx.main.utils import get_system_task_capacity
from awx.main.scheduler.dag_simple import SimpleDAG
from awx.main.scheduler.dag_workflow import WorkflowDAG
# Celery
from celery.task.control import inspect
@ -25,208 +31,6 @@ logger = logging.getLogger('awx.main.commands.run_task_system')
queue = FifoQueue('tower_task_manager')
class SimpleDAG(object):
''' A simple implementation of a directed acyclic graph '''
def __init__(self):
self.nodes = []
self.edges = []
def __contains__(self, obj):
for node in self.nodes:
if node['node_object'] == obj:
return True
return False
def __len__(self):
return len(self.nodes)
def __iter__(self):
return self.nodes.__iter__()
def generate_graphviz_plot(self):
def short_string_obj(obj):
if type(obj) == Job:
type_str = "Job"
if type(obj) == AdHocCommand:
type_str = "AdHocCommand"
elif type(obj) == InventoryUpdate:
type_str = "Inventory"
elif type(obj) == ProjectUpdate:
type_str = "Project"
elif type(obj) == WorkflowJob:
type_str = "Workflow"
else:
type_str = "Unknown"
type_str += "%s" % str(obj.id)
return type_str
doc = """
digraph g {
rankdir = LR
"""
for n in self.nodes:
doc += "%s [color = %s]\n" % (
short_string_obj(n['node_object']),
"red" if n['node_object'].status == 'running' else "black",
)
for from_node, to_node, label in self.edges:
doc += "%s -> %s [ label=\"%s\" ];\n" % (
short_string_obj(self.nodes[from_node]['node_object']),
short_string_obj(self.nodes[to_node]['node_object']),
label,
)
doc += "}\n"
gv_file = open('/tmp/graph.gv', 'w')
gv_file.write(doc)
gv_file.close()
def add_node(self, obj, metadata=None):
if self.find_ord(obj) is None:
self.nodes.append(dict(node_object=obj, metadata=metadata))
def add_edge(self, from_obj, to_obj, label=None):
from_obj_ord = self.find_ord(from_obj)
to_obj_ord = self.find_ord(to_obj)
if from_obj_ord is None or to_obj_ord is None:
raise LookupError("Object not found")
self.edges.append((from_obj_ord, to_obj_ord, label))
def add_edges(self, edgelist):
for edge_pair in edgelist:
self.add_edge(edge_pair[0], edge_pair[1], edge_pair[2])
def find_ord(self, obj):
for idx in range(len(self.nodes)):
if obj == self.nodes[idx]['node_object']:
return idx
return None
def get_node_type(self, obj):
if type(obj) == Job:
return "job"
elif type(obj) == AdHocCommand:
return "ad_hoc_command"
elif type(obj) == InventoryUpdate:
return "inventory_update"
elif type(obj) == ProjectUpdate:
return "project_update"
elif type(obj) == SystemJob:
return "system_job"
elif type(obj) == WorkflowJob:
return "workflow_job"
return "unknown"
def get_dependencies(self, obj, label=None):
antecedents = []
this_ord = self.find_ord(obj)
for node, dep, lbl in self.edges:
if label:
if node == this_ord and lbl == label:
antecedents.append(self.nodes[dep])
else:
if node == this_ord:
antecedents.append(self.nodes[dep])
return antecedents
def get_dependents(self, obj, label=None):
decendents = []
this_ord = self.find_ord(obj)
for node, dep, lbl in self.edges:
if label:
if dep == this_ord and lbl == label:
decendents.append(self.nodes[node])
else:
if dep == this_ord:
decendents.append(self.nodes[node])
return decendents
def get_leaf_nodes(self):
leafs = []
for n in self.nodes:
if len(self.get_dependencies(n['node_object'])) < 1:
leafs.append(n)
return leafs
def get_root_nodes(self):
roots = []
for n in self.nodes:
if len(self.get_dependents(n['node_object'])) < 1:
roots.append(n)
return roots
class WorkflowDAG(SimpleDAG):
def __init__(self, workflow_job=None):
super(WorkflowDAG, self).__init__()
if workflow_job:
self._init_graph(workflow_job)
def _init_graph(self, workflow_job):
workflow_nodes = workflow_job.workflow_job_nodes.all()
for workflow_node in workflow_nodes:
self.add_node(workflow_node)
for node_type in ['success_nodes', 'failure_nodes', 'always_nodes']:
for workflow_node in workflow_nodes:
related_nodes = getattr(workflow_node, node_type).all()
for related_node in related_nodes:
self.add_edge(workflow_node, related_node, node_type)
def bfs_nodes_to_run(self):
root_nodes = self.get_root_nodes()
nodes = root_nodes
nodes_found = []
for index, n in enumerate(nodes):
obj = n['node_object']
job = obj.job
if not job:
nodes_found.append(n)
# Job is about to run or is running. Hold our horses and wait for
# the job to finish. We can't proceed down the graph path until we
# have the job result.
elif job.status not in ['failed', 'error', 'successful']:
continue
elif job.status in ['failed', 'error']:
children_failed = self.get_dependencies(obj, 'failure_nodes')
children_always = self.get_dependencies(obj, 'always_nodes')
children_all = children_failed + children_always
nodes.extend(children_all)
elif job.status in ['successful']:
children_success = self.get_dependencies(obj, 'success_nodes')
nodes.extend(children_success)
else:
logger.warn("Incorrect graph structure")
return [n['node_object'] for n in nodes_found]
def is_workflow_done(self):
root_nodes = self.get_root_nodes()
nodes = root_nodes
for index, n in enumerate(nodes):
obj = n['node_object']
job = obj.job
if not job:
return False
# Job is about to run or is running. Hold our horses and wait for
# the job to finish. We can't proceed down the graph path until we
# have the job result.
elif job.status not in ['failed', 'error', 'successful']:
return False
elif job.status in ['failed', 'error']:
children_failed = self.get_dependencies(obj, 'failure_nodes')
children_always = self.get_dependencies(obj, 'always_nodes')
children_all = children_failed + children_always
nodes.extend(children_all)
elif job.status in ['successful']:
children_success = self.get_dependencies(obj, 'success_nodes')
nodes.extend(children_success)
else:
logger.warn("Incorrect graph structure")
return True
def get_tasks():
"""Fetch all Tower tasks that are relevant to the task management
system.
@ -247,6 +51,7 @@ def get_tasks():
graph_project_updates + graph_system_jobs +
graph_workflow_jobs,
key=lambda task: task.created)
print("Returning all_actions %s" % len(all_actions))
return all_actions
def get_running_workflow_jobs():
@ -277,14 +82,16 @@ def do_spawn_workflow_jobs():
#emit_websocket_notification('/socket.io/jobs', '', dict(id=))
def rebuild_graph(message):
def rebuild_graph():
"""Regenerate the task graph by refreshing known tasks from Tower, purging
orphaned running tasks, and creating dependencies for new tasks before
generating directed edge relationships between those tasks.
"""
'''
# Sanity check: Only do this on the primary node.
if Instance.objects.my_role() == 'secondary':
return None
'''
inspector = inspect()
if not hasattr(settings, 'IGNORE_CELERY_INSPECTOR'):
@ -297,6 +104,7 @@ def rebuild_graph(message):
all_sorted_tasks = get_tasks()
if not len(all_sorted_tasks):
print("All sorted task len is not? <%s, %s>" % (len(all_sorted_tasks), all_sorted_tasks))
return None
active_tasks = []
@ -417,53 +225,132 @@ def process_graph(graph, task_capacity):
'Remaining Capacity: %s' %
(str(node_obj), str(impact), str(remaining_volume)))
def run_taskmanager():
"""Receive task start and finish signals to rebuild a dependency graph
and manage the actual running of tasks.
"""
def shutdown_handler():
def _handler(signum, frame):
signal.signal(signum, signal.SIG_DFL)
os.kill(os.getpid(), signum)
return _handler
signal.signal(signal.SIGINT, shutdown_handler())
signal.signal(signal.SIGTERM, shutdown_handler())
paused = False
task_capacity = get_system_task_capacity()
last_rebuild = datetime.datetime.fromtimestamp(0)
# Attempt to pull messages off of the task system queue into perpetuity.
#
# A quick explanation of what is happening here:
# The popping messages off the queue bit is something of a sham. We remove
# the messages from the queue and then immediately throw them away. The
# `rebuild_graph` function, while it takes the message as an argument,
# ignores it.
#
# What actually happens is that we just check the database every 10 seconds
# to see what the task dependency graph looks like, and go do that. This
# is the job of the `rebuild_graph` function.
#
# There is some placeholder here: we may choose to actually use the message
# in the future.
while True:
# Pop a message off the queue.
# (If the queue is empty, None will be returned.)
message = queue.pop()
#logger = logging.getLogger('awx.main.scheduler')
# Parse out the message appropriately, rebuilding our graph if
# appropriate.
if (datetime.datetime.now() - last_rebuild).seconds > 10:
if message is not None and 'pause' in message:
logger.info("Pause command received: %s" % str(message))
paused = message['pause']
graph = rebuild_graph(message)
if not paused and graph is not None:
process_graph(graph, task_capacity)
last_rebuild = datetime.datetime.now()
time.sleep(0.1)
class CallbackBrokerWorker(ConsumerMixin):
def __init__(self, connection):
self.connection = connection
def get_consumers(self, Consumer, channel):
print("get_consumers() OK")
return [Consumer(queues=[Queue(settings.SCHEDULER_QUEUE,
Exchange(settings.SCHEDULER_QUEUE, type='topic'),
routing_key='scheduler.job.launch'),],
accept=['json'],
callbacks=[self.process_job_launch,]),
Consumer(queues=[Queue(settings.SCHEDULER_QUEUE,
Exchange(settings.SCHEDULER_QUEUE, type='topic'),
routing_key='scheduler.job.complete'),],
accept=['json'],
callbacks=[self.process_job_complete,]
)]
def schedule(self):
task_capacity = get_system_task_capacity()
graph = rebuild_graph()
if graph:
process_graph(graph, task_capacity)
def process_job_msg(self, body, message):
try:
if settings.DEBUG:
logger.info("Body: {}".format(body))
logger.info("Message: {}".format(message))
if "msg_type" not in body:
raise Exception("Payload does not have a msg_type")
if "job_id" not in body:
raise Exception("Payload does not have a job_id")
func = getattr(self, "process_%s" % body['msg_type'], None)
if not func:
raise AttributeError("No processor for message type %s" % body['msg_type'])
func(body)
# Raised by processors when msg isn't in the expected form.
except LookupError as e:
logger.error(e)
except AttributeError as e:
logger.error(e)
except Exception as exc:
import traceback
traceback.print_exc()
logger.error('Callback Task Processor Raised Exception: %r', exc)
finally:
message.ack()
self.schedule()
def process_job_launch(self, body, message):
print("process_job_launch()")
if "job_id" not in body:
raise KeyError("Payload does not contain job_id")
'''
Wait for job to exist.
The job is created in a transaction then the message is created, but
the transaction may not have completed.
FIXME: We could generate the message in a Django signal handler.
OR, we could call an explicit commit in the view and then send the
message.
'''
retries = 10
retry = 0
while not UnifiedJob.objects.filter(id=body['job_id']).exists():
time.sleep(0.3)
if retry >= retries:
logger.error("Failed to process 'job_launch' message for job %d" % body['job_id'])
# ack the message so we don't build up the queue.
#
# The job can still be chosen to run during tower startup or
# when another job is started or completes
message.ack()
return
retry += 1
job = UnifiedJob.objects.get(id=body['job_id'])
self.schedule()
message.ack()
def process_job_complete(self, body, message):
print("process_job_complete()")
if "job_id" not in body:
raise KeyError("Payload does not contain job_id")
# TODO: use list of finished status from jobs.py or unified_jobs.py
finished_status = ['successful', 'error', 'failed', 'completed']
q = UnifiedJob.objects.filter(id=body['job_id'])
# Ensure that the job is updated in the database before we call to
# schedule the next job.
retries = 10
retry = 0
while True:
# Job not found, most likely deleted. That's fine
if not q.exists():
logger.warn("Failed to find job '%d' while processing 'job_complete' message. Presume that it was deleted." % body['job_id'])
break
job = q[0]
if job.status in finished_status:
break
time.sleep(0.3)
if retry >= retries:
logger.error("Expected job status '%s' to be one of '%s' while processing 'job_complete' message." % (job.status, finished_status))
message.ack()
return
retry += 1
message.ack()
self.schedule()
class Command(NoArgsCommand):
"""Tower Task Management System
This daemon is designed to reside between our tasks and celery and
@ -477,7 +364,11 @@ class Command(NoArgsCommand):
help = 'Launch the Tower task management system'
def handle_noargs(self, **options):
try:
run_taskmanager()
except KeyboardInterrupt:
pass
with Connection(settings.BROKER_URL) as conn:
try:
worker = CallbackBrokerWorker(conn)
worker.run()
except KeyboardInterrupt:
print('Terminating Task Management System')

View File

@ -852,6 +852,17 @@ class UnifiedJob(PolymorphicModel, PasswordFieldsModel, CommonModelNameNotUnique
self.update_fields(start_args=json.dumps(kwargs), status='pending')
self.socketio_emit_status("pending")
from kombu import Connection, Exchange, Producer
connection = Connection(settings.BROKER_URL)
exchange = Exchange(settings.SCHEDULER_QUEUE, type='topic')
producer = Producer(connection)
producer.publish({ 'msg_type': 'job_launch', 'job_id': self.id },
serializer='json',
compression='bzip2',
exchange=exchange,
declare=[exchange],
routing_key='scheduler.job.launch')
# Each type of unified job has a different Task class; get the
# appropirate one.
# task_type = get_type_for_model(self)

View File

View File

@ -0,0 +1,133 @@
from awx.main.models import * # noqa
class SimpleDAG(object):
''' A simple implementation of a directed acyclic graph '''
def __init__(self):
self.nodes = []
self.edges = []
def __contains__(self, obj):
for node in self.nodes:
if node['node_object'] == obj:
return True
return False
def __len__(self):
return len(self.nodes)
def __iter__(self):
return self.nodes.__iter__()
def generate_graphviz_plot(self):
def short_string_obj(obj):
if type(obj) == Job:
type_str = "Job"
if type(obj) == AdHocCommand:
type_str = "AdHocCommand"
elif type(obj) == InventoryUpdate:
type_str = "Inventory"
elif type(obj) == ProjectUpdate:
type_str = "Project"
elif type(obj) == WorkflowJob:
type_str = "Workflow"
else:
type_str = "Unknown"
type_str += "%s" % str(obj.id)
return type_str
doc = """
digraph g {
rankdir = LR
"""
for n in self.nodes:
doc += "%s [color = %s]\n" % (
short_string_obj(n['node_object']),
"red" if n['node_object'].status == 'running' else "black",
)
for from_node, to_node, label in self.edges:
doc += "%s -> %s [ label=\"%s\" ];\n" % (
short_string_obj(self.nodes[from_node]['node_object']),
short_string_obj(self.nodes[to_node]['node_object']),
label,
)
doc += "}\n"
gv_file = open('/tmp/graph.gv', 'w')
gv_file.write(doc)
gv_file.close()
def add_node(self, obj, metadata=None):
if self.find_ord(obj) is None:
self.nodes.append(dict(node_object=obj, metadata=metadata))
def add_edge(self, from_obj, to_obj, label=None):
from_obj_ord = self.find_ord(from_obj)
to_obj_ord = self.find_ord(to_obj)
if from_obj_ord is None or to_obj_ord is None:
raise LookupError("Object not found")
self.edges.append((from_obj_ord, to_obj_ord, label))
def add_edges(self, edgelist):
for edge_pair in edgelist:
self.add_edge(edge_pair[0], edge_pair[1], edge_pair[2])
def find_ord(self, obj):
for idx in range(len(self.nodes)):
if obj == self.nodes[idx]['node_object']:
return idx
return None
def get_node_type(self, obj):
if type(obj) == Job:
return "job"
elif type(obj) == AdHocCommand:
return "ad_hoc_command"
elif type(obj) == InventoryUpdate:
return "inventory_update"
elif type(obj) == ProjectUpdate:
return "project_update"
elif type(obj) == SystemJob:
return "system_job"
elif type(obj) == WorkflowJob:
return "workflow_job"
return "unknown"
def get_dependencies(self, obj, label=None):
antecedents = []
this_ord = self.find_ord(obj)
for node, dep, lbl in self.edges:
if label:
if node == this_ord and lbl == label:
antecedents.append(self.nodes[dep])
else:
if node == this_ord:
antecedents.append(self.nodes[dep])
return antecedents
def get_dependents(self, obj, label=None):
decendents = []
this_ord = self.find_ord(obj)
for node, dep, lbl in self.edges:
if label:
if dep == this_ord and lbl == label:
decendents.append(self.nodes[node])
else:
if dep == this_ord:
decendents.append(self.nodes[node])
return decendents
def get_leaf_nodes(self):
leafs = []
for n in self.nodes:
if len(self.get_dependencies(n['node_object'])) < 1:
leafs.append(n)
return leafs
def get_root_nodes(self):
roots = []
for n in self.nodes:
if len(self.get_dependents(n['node_object'])) < 1:
roots.append(n)
return roots

View File

@ -0,0 +1,74 @@
from dag_simple import SimpleDAG
class WorkflowDAG(SimpleDAG):
def __init__(self, workflow_job=None):
super(WorkflowDAG, self).__init__()
if workflow_job:
self._init_graph(workflow_job)
def _init_graph(self, workflow_job):
workflow_nodes = workflow_job.workflow_job_nodes.all()
for workflow_node in workflow_nodes:
self.add_node(workflow_node)
for node_type in ['success_nodes', 'failure_nodes', 'always_nodes']:
for workflow_node in workflow_nodes:
related_nodes = getattr(workflow_node, node_type).all()
for related_node in related_nodes:
self.add_edge(workflow_node, related_node, node_type)
def bfs_nodes_to_run(self):
root_nodes = self.get_root_nodes()
nodes = root_nodes
nodes_found = []
for index, n in enumerate(nodes):
obj = n['node_object']
job = obj.job
if not job:
nodes_found.append(n)
# Job is about to run or is running. Hold our horses and wait for
# the job to finish. We can't proceed down the graph path until we
# have the job result.
elif job.status not in ['failed', 'error', 'successful']:
continue
elif job.status in ['failed', 'error']:
children_failed = self.get_dependencies(obj, 'failure_nodes')
children_always = self.get_dependencies(obj, 'always_nodes')
children_all = children_failed + children_always
nodes.extend(children_all)
elif job.status in ['successful']:
children_success = self.get_dependencies(obj, 'success_nodes')
nodes.extend(children_success)
else:
logger.warn("Incorrect graph structure")
return [n['node_object'] for n in nodes_found]
def is_workflow_done(self):
root_nodes = self.get_root_nodes()
nodes = root_nodes
for index, n in enumerate(nodes):
obj = n['node_object']
job = obj.job
if not job:
return False
# Job is about to run or is running. Hold our horses and wait for
# the job to finish. We can't proceed down the graph path until we
# have the job result.
elif job.status not in ['failed', 'error', 'successful']:
return False
elif job.status in ['failed', 'error']:
children_failed = self.get_dependencies(obj, 'failure_nodes')
children_always = self.get_dependencies(obj, 'always_nodes')
children_all = children_failed + children_always
nodes.extend(children_all)
elif job.status in ['successful']:
children_success = self.get_dependencies(obj, 'success_nodes')
nodes.extend(children_success)
else:
logger.warn("Incorrect graph structure")
return True

View File

@ -31,6 +31,9 @@ except:
# Pexpect
import pexpect
# Kombu
from kombu import Connection, Exchange, Queue, Producer
# Celery
from celery import Task, task
from celery.signals import celeryd_init
@ -202,6 +205,18 @@ def _send_notification_templates(instance, status_str):
for n in all_notification_templates],
job_id=instance.id)
def _send_job_complete_msg(instance):
connection = Connection(settings.BROKER_URL)
exchange = Exchange(settings.SCHEDULER_QUEUE, type='topic')
producer = Producer(connection)
producer.publish({ 'job_id': instance.id, 'msg_type': 'job_complete' },
serializer='json',
compression='bzip2',
exchange=exchange,
declare=[exchange],
routing_key='scheduler.job.complete')
@task(bind=True, queue='default')
def handle_work_success(self, result, task_actual):
instance = UnifiedJob.get_instance_by_type(task_actual['type'], task_actual['id'])
@ -210,6 +225,8 @@ def handle_work_success(self, result, task_actual):
_send_notification_templates(instance, 'succeeded')
_send_job_complete_msg(instance)
@task(bind=True, queue='default')
def handle_work_error(self, task_id, subtasks=None):
print('Executing error task id %s, subtasks: %s' %
@ -238,6 +255,9 @@ def handle_work_error(self, task_id, subtasks=None):
if first_instance:
_send_notification_templates(first_instance, 'failed')
if first_instance:
_send_job_complete_msg(first_instance)
@task(queue='default')
def update_inventory_computed_fields(inventory_id, should_update_hosts=True):

View File

@ -342,6 +342,7 @@ CELERY_RESULT_BACKEND = 'djcelery.backends.database:DatabaseBackend'
CELERY_QUEUES = (
Queue('default', Exchange('default'), routing_key='default'),
Queue('jobs', Exchange('jobs'), routing_key='jobs'),
#Queue('scheduler', Exchange('scheduler'), routing_key='scheduler.job.#'),
# Projects use a fanout queue, this isn't super well supported
Broadcast('projects'),
)
@ -737,6 +738,7 @@ ACTIVITY_STREAM_ENABLED_FOR_INVENTORY_SYNC = False
INTERNAL_API_URL = 'http://127.0.0.1:%s' % DEVSERVER_DEFAULT_PORT
CALLBACK_QUEUE = "callback_tasks"
SCHEDULER_QUEUE = "scheduler"
TASK_COMMAND_PORT = 6559
@ -1042,6 +1044,10 @@ LOGGING = {
'handlers': ['console', 'file', 'task_system'],
'propagate': False
},
'awx.main.scheduler': {
'handlers': ['console', 'file', 'task_system'],
'propagate': False
},
'awx.main.commands.run_fact_cache_receiver': {
'handlers': ['console', 'file', 'fact_receiver'],
'propagate': False