1
0
mirror of https://github.com/ansible/awx.git synced 2024-11-01 16:51:11 +03:00

Implement a recycling process for consuming job events to keep from

dealing with a constantly growing heap deep in python land
This commit is contained in:
Matthew Jones 2014-06-30 10:03:26 -05:00
parent 99c0b089ea
commit e627ee0711

View File

@ -3,6 +3,7 @@
# Python # Python
import os import os
import sys
import datetime import datetime
import logging import logging
import json import json
@ -26,6 +27,8 @@ from awx.main.models import *
# ZeroMQ # ZeroMQ
import zmq import zmq
MAX_REQUESTS = 20000
class CallbackReceiver(object): class CallbackReceiver(object):
def __init__(self): def __init__(self):
@ -54,27 +57,36 @@ class CallbackReceiver(object):
queue_publisher.bind(queue_port) queue_publisher.bind(queue_port)
if use_workers: if use_workers:
workers = []
for idx in range(4):
w = Process(target=self.callback_worker, args=(queue_port,)) w = Process(target=self.callback_worker, args=(queue_port,))
w.daemon = True w.daemon = True
w.start() w.start()
workers.append(w)
signal.signal(signal.SIGINT, shutdown_handler(workers)) signal.signal(signal.SIGINT, shutdown_handler([w]))
signal.signal(signal.SIGTERM, shutdown_handler(workers)) signal.signal(signal.SIGTERM, shutdown_handler([w]))
if settings.DEBUG: if settings.DEBUG:
print 'Started callback receiver (4 workers)' print 'Started worker'
elif settings.DEBUG: elif settings.DEBUG:
print 'Started callback receiver (no workers)' print 'Started callback receiver (no workers)'
message_number = 0
while True: # Handle signal while True: # Handle signal
message = consumer_subscriber.recv_json() message = consumer_subscriber.recv_json()
if True: # check_pre_handle(message) or not use_workers: message_number += 1
if not use_workers:
self.process_job_event(message) self.process_job_event(message)
else: else:
queue_publisher.send_json(message) queue_publisher.send_json(message)
if message_number >= MAX_REQUESTS:
message_number = 0
print("Recycling worker process")
w.join()
w = Process(target=self.callback_worker, args=(queue_port,))
w.daemon = True
w.start()
consumer_subscriber.send("1") consumer_subscriber.send("1")
# NOTE: This cache doesn't work too terribly well but it can help prevent database queries
# we may want to use something like memcached here instead
def process_parent_cache(self, job_id, event_object): def process_parent_cache(self, job_id, event_object):
if event_object.event not in ('playbook_on_start', 'playbook_on_play_start', 'playbook_on_setup', 'playbook_on_task_start'): if event_object.event not in ('playbook_on_start', 'playbook_on_play_start', 'playbook_on_setup', 'playbook_on_task_start'):
return return
@ -155,12 +167,18 @@ class CallbackReceiver(object):
retry_count) retry_count)
def callback_worker(self, port): def callback_worker(self, port):
messages_processed = 0
pool_context = zmq.Context() pool_context = zmq.Context()
pool_subscriber = pool_context.socket(zmq.PULL) pool_subscriber = pool_context.socket(zmq.PULL)
pool_subscriber.connect(port) pool_subscriber.connect(port)
while True: while True:
message = pool_subscriber.recv_json() message = pool_subscriber.recv_json()
self.process_job_event(message) self.process_job_event(message)
messages_processed += 1
if messages_processed >= MAX_REQUESTS:
print("Shutting down message receiver")
pool_subscriber.close()
sys.exit(0)
class Command(NoArgsCommand): class Command(NoArgsCommand):
''' '''