From 8d90650134d44bd0d071bdaf861de53a0e0cfa7f Mon Sep 17 00:00:00 2001 From: Luke Sneeringer Date: Wed, 29 Oct 2014 13:51:47 -0500 Subject: [PATCH] Shifting job callbacks to ZeroMQ. --- .../commands/run_callback_receiver.py | 32 +++++++------------ awx/main/queue.py | 2 +- awx/plugins/callback/job_event_callback.py | 22 +++---------- 3 files changed, 17 insertions(+), 39 deletions(-) diff --git a/awx/main/management/commands/run_callback_receiver.py b/awx/main/management/commands/run_callback_receiver.py index 3eeadaa8b0..1cb8e2f30a 100644 --- a/awx/main/management/commands/run_callback_receiver.py +++ b/awx/main/management/commands/run_callback_receiver.py @@ -24,19 +24,17 @@ from django.db import connection # AWX from awx.main.models import * - -# ZeroMQ -import zmq +from awx.main.queue import PubSub MAX_REQUESTS = 10000 WORKERS = 4 -class CallbackReceiver(object): +class CallbackReceiver(object): def __init__(self): self.parent_mappings = {} - def run_subscriber(self, consumer_port, queue_port, use_workers=True): + def run_subscriber(self, use_workers=True): def shutdown_handler(active_workers): def _handler(signum, frame): for active_worker in active_workers: @@ -67,7 +65,10 @@ class CallbackReceiver(object): elif settings.DEBUG: print 'Started callback receiver (no workers)' - main_process = Process(target=self.callback_handler, args=(use_workers, consumer_port, worker_queues,)) + main_process = Process( + target=self.callback_handler, + args=(use_workers, worker_queues,), + ) main_process.daemon = True main_process.start() @@ -88,16 +89,12 @@ class CallbackReceiver(object): sys.exit(1) time.sleep(0.1) - def callback_handler(self, use_workers, consumer_port, worker_queues): + def callback_handler(self, use_workers, worker_queues): message_number = 0 total_messages = 0 last_parent_events = {} - self.consumer_context = zmq.Context() - self.consumer_subscriber = self.consumer_context.socket(zmq.REP) - self.consumer_subscriber.bind(consumer_port) - while True: # Handle signal - message = self.consumer_subscriber.recv_json() + for message in pubsub.subscribe('callbacks'): total_messages += 1 if not use_workers: self.process_job_event(message) @@ -232,14 +229,9 @@ class Command(NoArgsCommand): Save Job Callback receiver (see awx.plugins.callbacks.job_event_callback) Runs as a management command and receives job save events. It then hands them off to worker processors (see Worker) which writes them to the database - ''' - + ''' help = 'Launch the job callback receiver' - option_list = NoArgsCommand.option_list + ( - make_option('--port', dest='port', type='int', default=5556, - help='Port to listen for requests on'),) - def init_logging(self): log_levels = dict(enumerate([logging.ERROR, logging.INFO, logging.DEBUG, 0])) @@ -253,11 +245,9 @@ class Command(NoArgsCommand): def handle_noargs(self, **options): self.verbosity = int(options.get('verbosity', 1)) self.init_logging() - consumer_port = settings.CALLBACK_CONSUMER_PORT - queue_port = settings.CALLBACK_QUEUE_PORT cr = CallbackReceiver() try: - cr.run_subscriber(consumer_port, queue_port) + cr.run_subscriber() except KeyboardInterrupt: pass diff --git a/awx/main/queue.py b/awx/main/queue.py index ecc2f2ef02..777a1a4981 100644 --- a/awx/main/queue.py +++ b/awx/main/queue.py @@ -76,7 +76,7 @@ class PubSub(object): from contextmanager import closing with closing(PubSub('foobar')) as foobar: - for message in foobar.listen(wait=0.1): + for message in foobar.subscribe(wait=0.1): """ self._queue_name = queue_name diff --git a/awx/plugins/callback/job_event_callback.py b/awx/plugins/callback/job_event_callback.py index e58feab6e7..92e21ba24d 100644 --- a/awx/plugins/callback/job_event_callback.py +++ b/awx/plugins/callback/job_event_callback.py @@ -42,9 +42,8 @@ import time # Requests import requests -# ZeroMQ -import zmq - +# Tower +from awx.main.queue import PubSub class TokenAuth(requests.auth.AuthBase): @@ -80,9 +79,6 @@ class CallbackModule(object): self.job_id = int(os.getenv('JOB_ID')) self.base_url = os.getenv('REST_API_URL', '') self.auth_token = os.getenv('REST_API_TOKEN', '') - self.callback_consumer_port = os.getenv('CALLBACK_CONSUMER_PORT', '') - self.context = None - self.socket = None self._init_logging() self._init_connection() self.counter = 0 @@ -109,11 +105,6 @@ class CallbackModule(object): self.context = None self.socket = None - def _start_connection(self): - self.context = zmq.Context() - self.socket = self.context.socket(zmq.REQ) - self.socket.connect(self.callback_consumer_port) - def _post_job_event_queue_msg(self, event, event_data): self.counter += 1 msg = { @@ -132,13 +123,10 @@ class CallbackModule(object): try: if not hasattr(self, 'connection_pid'): self.connection_pid = active_pid - if self.connection_pid != active_pid: - self._init_connection() - if self.context is None: - self._start_connection() - self.socket.send_json(msg) - self.socket.recv() + # Publish the callback through Redis. + pubsub = PubSub('callbacks') + pubsub.publish(msg) return except Exception, e: self.logger.info('Publish Exception: %r, retry=%d', e,