1
0
mirror of https://github.com/ansible/awx.git synced 2024-11-02 09:51:09 +03:00

Pull results off zeromq and distribute to workers

This commit is contained in:
Matthew Jones 2014-02-12 16:09:57 -05:00
parent 063380304a
commit 770947d18d
2 changed files with 55 additions and 34 deletions

View File

@ -6,6 +6,7 @@ import datetime
import logging import logging
import json import json
from optparse import make_option from optparse import make_option
from multiprocessing import Process
# Django # Django
from django.conf import settings from django.conf import settings
@ -22,26 +23,16 @@ from awx.main.models import *
# ZeroMQ # ZeroMQ
import zmq import zmq
class Command(NoArgsCommand): class Worker(Process):
'''
Management command to run the job callback receiver
'''
help = 'Launch the job callback receiver' def run(self):
print("Starting worker")
option_list = NoArgsCommand.option_list + ( pool_context = zmq.Context()
make_option('--port', dest='port', type='int', default=5556, pool_subscriber = pool_context.socket(zmq.PULL)
help='Port to listen for requests on'),) pool_subscriber.connect("ipc:///tmp/callback_receiver.ipc")
while True:
def init_logging(self): message = pool_subscriber.recv_json()
log_levels = dict(enumerate([logging.ERROR, logging.INFO, self.process_job_event(message)
logging.DEBUG, 0]))
self.logger = logging.getLogger('awx.main.commands.run_callback_receiver')
self.logger.setLevel(log_levels.get(self.verbosity, 0))
handler = logging.StreamHandler()
handler.setFormatter(logging.Formatter('%(message)s'))
self.logger.addHandler(handler)
self.logger.propagate = False
@transaction.commit_on_success @transaction.commit_on_success
def process_job_event(self, data): def process_job_event(self, data):
@ -79,25 +70,56 @@ class Command(NoArgsCommand):
break break
except DatabaseError as e: except DatabaseError as e:
transaction.rollback() transaction.rollback()
logger.debug('Database error saving job event, retrying in ' # logger.debug('Database error saving job event, retrying in '
'1 second (retry #%d): %s', retry_count + 1, e) # '1 second (retry #%d): %s', retry_count + 1, e)
time.sleep(1) time.sleep(1)
else: else:
logger.error('Failed to save job event after %d retries.', logger.error('Failed to save job event after %d retries.',
retry_count) retry_count)
class Command(NoArgsCommand):
'''
Management command to run the job callback receiver
'''
help = 'Launch the job callback receiver'
option_list = NoArgsCommand.option_list + (
make_option('--port', dest='port', type='int', default=5556,
help='Port to listen for requests on'),)
def init_logging(self):
log_levels = dict(enumerate([logging.ERROR, logging.INFO,
logging.DEBUG, 0]))
self.logger = logging.getLogger('awx.main.commands.run_callback_receiver')
self.logger.setLevel(log_levels.get(self.verbosity, 0))
handler = logging.StreamHandler()
handler.setFormatter(logging.Formatter('%(message)s'))
self.logger.addHandler(handler)
self.logger.propagate = False
def run_subscriber(self, port=5556): def run_subscriber(self, port=5556):
print("Starting ZMQ Context")
context = zmq.Context() consumer_context = zmq.Context()
subscriber = context.socket(zmq.REP) consumer_subscriber = consumer_context.socket(zmq.PULL)
print("Starting connection") consumer_subscriber.bind("tcp://127.0.0.1:%s" % str(port))
subscriber.bind("tcp://127.0.0.1:%s" % str(port)) print("Consumer Listening on tcp://127.0.0.1:%s" % str(port))
print("Listening on tcp://127.0.0.1:%s" % str(port))
queue_context = zmq.Context()
queue_publisher = queue_context.socket(zmq.PUSH)
queue_publisher.bind("ipc:///tmp/callback_receiver.ipc")
print("Publisher Listening on ipc: /tmp/callback_receiver.ip")
workers = []
for idx in range(4):
w = Worker()
w.start()
workers.append(w)
while True: # Handle signal while True: # Handle signal
message = subscriber.recv() message = consumer_subscriber.recv_json()
subscriber.send("1") queue_publisher.send_json(message)
data = json.loads(message)
self.process_job_event(data)
def handle_noargs(self, **options): def handle_noargs(self, **options):
self.verbosity = int(options.get('verbosity', 1)) self.verbosity = int(options.get('verbosity', 1))

View File

@ -107,7 +107,7 @@ class CallbackModule(object):
def _start_connection(self): def _start_connection(self):
self.context = zmq.Context() self.context = zmq.Context()
self.socket = self.context.socket(zmq.REQ) self.socket = self.context.socket(zmq.PUSH)
self.socket.connect("tcp://127.0.0.1:5556") self.socket.connect("tcp://127.0.0.1:5556")
def _post_job_event_queue_msg(self, event, event_data): def _post_job_event_queue_msg(self, event, event_data):
@ -130,9 +130,8 @@ class CallbackModule(object):
if self.context is None: if self.context is None:
self._start_connection() self._start_connection()
self.socket.send(json.dumps(msg)) self.socket.send_json(msg)
self.logger.debug('Publish: %r, retry=%d', msg, retry_count) self.logger.debug('Publish: %r, retry=%d', msg, retry_count)
reply = self.socket.recv()
return return
except Exception, e: except Exception, e:
self.logger.info('Publish Exception: %r, retry=%d', e, self.logger.info('Publish Exception: %r, retry=%d', e,