1
0
mirror of https://github.com/ansible/awx.git synced 2024-10-31 23:51:09 +03:00

Replace zeromq with multiprocessing queue for worker ipc in the callback

receiever module
This commit is contained in:
Matthew Jones 2014-09-02 16:06:52 -04:00
parent ab971215c8
commit b2c77a650b

View File

@ -10,7 +10,7 @@ import json
import signal
import time
from optparse import make_option
from multiprocessing import Process
from multiprocessing import Process, Queue
# Django
from django.conf import settings
@ -59,12 +59,8 @@ class CallbackReceiver(object):
if use_workers:
connection.close()
for idx in range(WORKERS):
queue_port_actual = queue_port + "-%s" % str(idx)
queue_context = zmq.Context()
queue_publisher = queue_context.socket(zmq.PUSH)
queue_publisher.bind(queue_port_actual)
w = Process(target=self.callback_worker, args=(queue_port_actual,))
queue_actual = Queue()
w = Process(target=self.callback_worker, args=(queue_actual,))
w.daemon = True
w.start()
@ -72,7 +68,7 @@ class CallbackReceiver(object):
signal.signal(signal.SIGTERM, shutdown_handler([w]))
if settings.DEBUG:
print 'Started worker %s' % str(idx)
worker_queues.append([0, queue_publisher, w])
worker_queues.append([0, queue_actual, w])
elif settings.DEBUG:
print 'Started callback receiver (no workers)'
@ -106,7 +102,7 @@ class CallbackReceiver(object):
else:
parent = None
if parent is not None:
message['parent'] = parent.id
message['parent'] = parent
if 'created' in message:
del(message['created'])
if message['event'] in ('playbook_on_start', 'playbook_on_play_start',
@ -115,18 +111,18 @@ class CallbackReceiver(object):
else:
if message['event'] == 'playbook_on_stats':
job_parent_events = {}
queue_actual = worker_queues[total_messages % WORKERS]
queue_actual[0] += 1
queue_actual[1].send_json(message)
if queue_actual[0] >= MAX_REQUESTS:
queue_actual[0] = 0
queue_actual_worker = worker_queues[total_messages % WORKERS]
queue_actual_worker[0] += 1
queue_actual_worker[1].put(message)
if queue_actual_worker[0] >= MAX_REQUESTS:
queue_actual_worker[0] = 0
print("Recycling worker process")
queue_actual[2].join()
queue_actual_worker[2].join()
connection.close()
w = Process(target=self.callback_worker, args=(queue_port + "-%s" % str(total_messages % WORKERS),))
w = Process(target=self.callback_worker, args=(queue_actual_worker[1],))
w.daemon = True
w.start()
queue_actual[2] = w
queue_actual_worker[2] = w
last_parent_events[message['job_id']] = job_parent_events
consumer_subscriber.send("1")
@ -162,9 +158,9 @@ class CallbackReceiver(object):
data['event_data']['res']['invocation']['module_args'] = ""
job_event = JobEvent(**data)
if parent_id is not None:
job_event.parent = JobEvent.objects.get(id=parent_id)
job_event.parent_id = parent_id
job_event.save(post_process=True)
return job_event
return job_event.id
except DatabaseError as e:
transaction.rollback()
print('Database error saving job event, retrying in '
@ -175,20 +171,14 @@ class CallbackReceiver(object):
retry_count)
return None
def callback_worker(self, port):
def callback_worker(self, queue_actual):
messages_processed = 0
pool_context = zmq.Context()
pool_subscriber = pool_context.socket(zmq.PULL)
pool_subscriber.connect(port)
while True:
message = pool_subscriber.recv_json()
message = queue_actual.get()
self.process_job_event(message)
messages_processed += 1
if messages_processed >= MAX_REQUESTS:
print("Shutting down message receiver")
pool_subscriber.close()
pool_context.term()
time.sleep(0.1)
sys.exit(0)
class Command(NoArgsCommand):