diff --git a/awx/main/management/commands/run_callback_receiver.py b/awx/main/management/commands/run_callback_receiver.py index 2d9a21a8da..ff84bf3701 100644 --- a/awx/main/management/commands/run_callback_receiver.py +++ b/awx/main/management/commands/run_callback_receiver.py @@ -6,6 +6,7 @@ import datetime import logging import json from optparse import make_option +from multiprocessing import Process # Django from django.conf import settings @@ -22,26 +23,16 @@ from awx.main.models import * # ZeroMQ import zmq -class Command(NoArgsCommand): - ''' - Management command to run the job callback receiver - ''' +class Worker(Process): - help = 'Launch the job callback receiver' - - option_list = NoArgsCommand.option_list + ( - make_option('--port', dest='port', type='int', default=5556, - help='Port to listen for requests on'),) - - def init_logging(self): - log_levels = dict(enumerate([logging.ERROR, logging.INFO, - logging.DEBUG, 0])) - self.logger = logging.getLogger('awx.main.commands.run_callback_receiver') - self.logger.setLevel(log_levels.get(self.verbosity, 0)) - handler = logging.StreamHandler() - handler.setFormatter(logging.Formatter('%(message)s')) - self.logger.addHandler(handler) - self.logger.propagate = False + def run(self): + print("Starting worker") + pool_context = zmq.Context() + pool_subscriber = pool_context.socket(zmq.PULL) + pool_subscriber.connect("ipc:///tmp/callback_receiver.ipc") + while True: + message = pool_subscriber.recv_json() + self.process_job_event(message) @transaction.commit_on_success def process_job_event(self, data): @@ -79,25 +70,56 @@ class Command(NoArgsCommand): break except DatabaseError as e: transaction.rollback() - logger.debug('Database error saving job event, retrying in ' - '1 second (retry #%d): %s', retry_count + 1, e) + # logger.debug('Database error saving job event, retrying in ' + # '1 second (retry #%d): %s', retry_count + 1, e) time.sleep(1) else: logger.error('Failed to save job event after %d retries.', retry_count) + +class Command(NoArgsCommand): + ''' + Management command to run the job callback receiver + ''' + + help = 'Launch the job callback receiver' + + option_list = NoArgsCommand.option_list + ( + make_option('--port', dest='port', type='int', default=5556, + help='Port to listen for requests on'),) + + def init_logging(self): + log_levels = dict(enumerate([logging.ERROR, logging.INFO, + logging.DEBUG, 0])) + self.logger = logging.getLogger('awx.main.commands.run_callback_receiver') + self.logger.setLevel(log_levels.get(self.verbosity, 0)) + handler = logging.StreamHandler() + handler.setFormatter(logging.Formatter('%(message)s')) + self.logger.addHandler(handler) + self.logger.propagate = False + def run_subscriber(self, port=5556): - print("Starting ZMQ Context") - context = zmq.Context() - subscriber = context.socket(zmq.REP) - print("Starting connection") - subscriber.bind("tcp://127.0.0.1:%s" % str(port)) - print("Listening on tcp://127.0.0.1:%s" % str(port)) + + consumer_context = zmq.Context() + consumer_subscriber = consumer_context.socket(zmq.PULL) + consumer_subscriber.bind("tcp://127.0.0.1:%s" % str(port)) + print("Consumer Listening on tcp://127.0.0.1:%s" % str(port)) + + queue_context = zmq.Context() + queue_publisher = queue_context.socket(zmq.PUSH) + queue_publisher.bind("ipc:///tmp/callback_receiver.ipc") + print("Publisher Listening on ipc: /tmp/callback_receiver.ip") + + workers = [] + for idx in range(4): + w = Worker() + w.start() + workers.append(w) + while True: # Handle signal - message = subscriber.recv() - subscriber.send("1") - data = json.loads(message) - self.process_job_event(data) + message = consumer_subscriber.recv_json() + queue_publisher.send_json(message) def handle_noargs(self, **options): self.verbosity = int(options.get('verbosity', 1)) diff --git a/awx/plugins/callback/job_event_callback.py b/awx/plugins/callback/job_event_callback.py index 7fc1e7465d..4c33ed8345 100644 --- a/awx/plugins/callback/job_event_callback.py +++ b/awx/plugins/callback/job_event_callback.py @@ -107,7 +107,7 @@ class CallbackModule(object): def _start_connection(self): self.context = zmq.Context() - self.socket = self.context.socket(zmq.REQ) + self.socket = self.context.socket(zmq.PUSH) self.socket.connect("tcp://127.0.0.1:5556") def _post_job_event_queue_msg(self, event, event_data): @@ -130,9 +130,8 @@ class CallbackModule(object): if self.context is None: self._start_connection() - self.socket.send(json.dumps(msg)) + self.socket.send_json(msg) self.logger.debug('Publish: %r, retry=%d', msg, retry_count) - reply = self.socket.recv() return except Exception, e: self.logger.info('Publish Exception: %r, retry=%d', e,