From 3b07d9745deaaacbdd0b38556a19e3c7b555710f Mon Sep 17 00:00:00 2001 From: Matthew Jones Date: Fri, 18 Nov 2016 09:44:17 -0500 Subject: [PATCH 1/3] Port fact caching system to rabbitmq * Purge all references to zmq also * New setting to control the queue it works on --- .../commands/run_fact_cache_receiver.py | 77 +++++++++++-------- awx/main/queue.py | 33 +------- awx/main/tasks.py | 1 + awx/plugins/fact_caching/tower.py | 36 ++++----- awx/settings/defaults.py | 2 + 5 files changed, 63 insertions(+), 86 deletions(-) diff --git a/awx/main/management/commands/run_fact_cache_receiver.py b/awx/main/management/commands/run_fact_cache_receiver.py index afc6095309..78bf88ffb4 100644 --- a/awx/main/management/commands/run_fact_cache_receiver.py +++ b/awx/main/management/commands/run_fact_cache_receiver.py @@ -6,6 +6,9 @@ import logging from threading import Thread from datetime import datetime +from kombu import Connection, Exchange, Queue +from kombu.mixins import ConsumerMixin + # Django from django.core.management.base import NoArgsCommand from django.conf import settings @@ -23,6 +26,34 @@ class FactCacheReceiver(object): def __init__(self): self.timestamp = None + + def run_receiver(self, use_processing_threads=True): + with Socket('fact_cache', 'r') as facts: + for message in facts.listen(): + if 'host' not in message or 'facts' not in message or 'date_key' not in message: + logger.warn('Received invalid message %s' % message) + continue + logger.info('Received message %s' % message) + if use_processing_threads: + wt = Thread(target=self.process_fact_message, args=(message,)) + wt.start() + else: + self.process_fact_message(message) + + +class FactBrokerWorker(ConsumerMixin): + + def __init__(self, connection): + self.connection = connection + self.timestamp = None + + def get_consumers(self, Consumer, channel): + return [Consumer(queues=[Queue(settings.FACT_QUEUE, + Exchange(settings.FACT_QUEUE, type='direct'), + routing_key=settings.FACT_QUEUE)], + accept=['json'], + callbacks=[self.process_fact_message])] + def _determine_module(self, facts): # Symantically determine the module type if len(facts) == 1: @@ -40,17 +71,13 @@ class FactCacheReceiver(object): facts = self._extract_module_facts(module, facts) return (module, facts) - def process_fact_message(self, message): - hostname = message['host'] - inventory_id = message['inventory_id'] - facts_data = message['facts'] - date_key = message['date_key'] - - # TODO: in ansible < v2 module_setup is emitted for "smart" fact caching. - # ansible v2 will not emit this message. Thus, this can be removed at that time. - if 'module_setup' in facts_data and len(facts_data) == 1: - logger.info('Received module_setup message') - return None + def process_fact_message(self, body, message): + print body + print type(body) + hostname = body['host'] + inventory_id = body['inventory_id'] + facts_data = body['facts'] + date_key = body['date_key'] try: host_obj = Host.objects.get(name=hostname, inventory__id=inventory_id) @@ -79,32 +106,18 @@ class FactCacheReceiver(object): logger.info('Created new fact <%s, %s>' % (fact_obj.id, module_name)) return fact_obj - def run_receiver(self, use_processing_threads=True): - with Socket('fact_cache', 'r') as facts: - for message in facts.listen(): - if 'host' not in message or 'facts' not in message or 'date_key' not in message: - logger.warn('Received invalid message %s' % message) - continue - logger.info('Received message %s' % message) - if use_processing_threads: - wt = Thread(target=self.process_fact_message, args=(message,)) - wt.start() - else: - self.process_fact_message(message) - class Command(NoArgsCommand): ''' - blah blah + Save Fact Event packets to the database as emitted from a Tower Scan Job ''' help = 'Launch the Fact Cache Receiver' def handle_noargs(self, **options): - fcr = FactCacheReceiver() - fact_cache_port = settings.FACT_CACHE_PORT - logger.info('Listening on port http://0.0.0.0:' + str(fact_cache_port)) - try: - fcr.run_receiver() - except KeyboardInterrupt: - pass + with Connection(settings.BROKER_URL) as conn: + try: + worker = FactBrokerWorker(conn) + worker.run() + except KeyboardInterrupt: + pass diff --git a/awx/main/queue.py b/awx/main/queue.py index bfb487441f..541724405f 100644 --- a/awx/main/queue.py +++ b/awx/main/queue.py @@ -2,7 +2,6 @@ # All Rights Reserved. # Python -import json import logging import os @@ -12,37 +11,7 @@ from django.conf import settings # Kombu from kombu import Connection, Exchange, Producer -__all__ = ['FifoQueue', 'CallbackQueueDispatcher'] - - -# TODO: Figure out wtf to do with this class -class FifoQueue(object): - """An abstraction class implemented for a simple push/pull queue. - - Intended to allow alteration of backend details in a single, consistent - way throughout the Tower application. - """ - def __init__(self, queue_name): - """Instantiate a queue object, which is able to interact with a - particular queue. - """ - self._queue_name = queue_name - - def __len__(self): - """Return the length of the Redis list.""" - #return redis.llen(self._queue_name) - return 0 - - def push(self, value): - """Push a value onto the right side of the queue.""" - #redis.rpush(self._queue_name, json.dumps(value)) - - def pop(self): - """Retrieve a value from the left side of the queue.""" - #answer = redis.lpop(self._queue_name) - answer = None - if answer: - return json.loads(answer) +__all__ = ['CallbackQueueDispatcher'] class CallbackQueueDispatcher(object): diff --git a/awx/main/tasks.py b/awx/main/tasks.py index 47a6583e9a..92ed8b7a5d 100644 --- a/awx/main/tasks.py +++ b/awx/main/tasks.py @@ -852,6 +852,7 @@ class RunJob(BaseTask): # Set environment variables related to scan jobs if job.job_type == PERM_INVENTORY_SCAN: + env['FACT_QUEUE'] = settings.FACT_QUEUE env['ANSIBLE_LIBRARY'] = self.get_path_to('..', 'plugins', 'library') env['ANSIBLE_CACHE_PLUGINS'] = self.get_path_to('..', 'plugins', 'fact_caching') env['ANSIBLE_CACHE_PLUGIN'] = "tower" diff --git a/awx/plugins/fact_caching/tower.py b/awx/plugins/fact_caching/tower.py index a956a5f50b..00f280a884 100755 --- a/awx/plugins/fact_caching/tower.py +++ b/awx/plugins/fact_caching/tower.py @@ -29,20 +29,15 @@ # ARISING IN ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE # POSSIBILITY OF SUCH DAMAGE. -import sys import os import time -from ansible import constants as C + try: from ansible.cache.base import BaseCacheModule except: from ansible.plugins.cache.base import BaseCacheModule -try: - import zmq -except ImportError: - print("pyzmq is required") - sys.exit(1) +from kombu import Connection, Exchange, Producer class CacheModule(BaseCacheModule): @@ -52,20 +47,13 @@ class CacheModule(BaseCacheModule): self._cache = {} self._all_keys = {} - # This is the local tower zmq connection - self._tower_connection = C.CACHE_PLUGIN_CONNECTION self.date_key = time.time() - try: - self.context = zmq.Context() - self.socket = self.context.socket(zmq.REQ) - self.socket.setsockopt(zmq.RCVTIMEO, 4000) - self.socket.setsockopt(zmq.LINGER, 2000) - self.socket.connect(self._tower_connection) - except Exception, e: - print("Connection to zeromq failed at %s with error: %s" % (str(self._tower_connection), - str(e))) - sys.exit(1) - + self.callback_connection = os.environ['CALLBACK_CONNECTION'] + self.callback_queue = os.environ['FACT_QUEUE'] + self.connection = Connection(self.callback_connection) + self.exchange = Exchange(self.callback_queue, type='direct') + self.producer = Producer(self.connection) + def filter_ansible_facts(self, facts): return dict((k, facts[k]) for k in facts.keys() if k.startswith('ansible_')) @@ -117,8 +105,12 @@ class CacheModule(BaseCacheModule): } # Emit fact data to tower for processing - self.socket.send_json(packet) - self.socket.recv() + self.producer.publish(packet, + serializer='json', + compression='bzip2', + exchange=self.exchange, + declare=[self.exchange], + routing_key=self.callback_queue) def keys(self): return self._cache.keys() diff --git a/awx/settings/defaults.py b/awx/settings/defaults.py index 69cd7f232b..01264de6ba 100644 --- a/awx/settings/defaults.py +++ b/awx/settings/defaults.py @@ -797,6 +797,8 @@ ACTIVITY_STREAM_ENABLED_FOR_INVENTORY_SYNC = False INTERNAL_API_URL = 'http://127.0.0.1:%s' % DEVSERVER_DEFAULT_PORT CALLBACK_QUEUE = "callback_tasks" +FACT_QUEUE = "facts" + SCHEDULER_QUEUE = "scheduler" TASK_COMMAND_PORT = 6559 From 35b9786e06c129ef4ec89f2dfb1a5056f43ede61 Mon Sep 17 00:00:00 2001 From: Matthew Jones Date: Fri, 18 Nov 2016 10:23:07 -0500 Subject: [PATCH 2/3] Fixing up fact cachine related unit tests --- .../commands/run_fact_cache_receiver.py | 21 ------------ .../commands/test_run_fact_cache_receiver.py | 33 ++++++------------- awx/plugins/fact_caching/tower.py | 2 +- 3 files changed, 11 insertions(+), 45 deletions(-) diff --git a/awx/main/management/commands/run_fact_cache_receiver.py b/awx/main/management/commands/run_fact_cache_receiver.py index 78bf88ffb4..9796e1db6c 100644 --- a/awx/main/management/commands/run_fact_cache_receiver.py +++ b/awx/main/management/commands/run_fact_cache_receiver.py @@ -3,7 +3,6 @@ # Python import logging -from threading import Thread from datetime import datetime from kombu import Connection, Exchange, Queue @@ -17,30 +16,10 @@ from django.utils import timezone # AWX from awx.main.models.fact import Fact from awx.main.models.inventory import Host -from awx.main.socket_queue import Socket logger = logging.getLogger('awx.main.commands.run_fact_cache_receiver') -class FactCacheReceiver(object): - def __init__(self): - self.timestamp = None - - - def run_receiver(self, use_processing_threads=True): - with Socket('fact_cache', 'r') as facts: - for message in facts.listen(): - if 'host' not in message or 'facts' not in message or 'date_key' not in message: - logger.warn('Received invalid message %s' % message) - continue - logger.info('Received message %s' % message) - if use_processing_threads: - wt = Thread(target=self.process_fact_message, args=(message,)) - wt.start() - else: - self.process_fact_message(message) - - class FactBrokerWorker(ConsumerMixin): def __init__(self, connection): diff --git a/awx/main/tests/functional/commands/test_run_fact_cache_receiver.py b/awx/main/tests/functional/commands/test_run_fact_cache_receiver.py index 2042d9e426..87b729dda8 100644 --- a/awx/main/tests/functional/commands/test_run_fact_cache_receiver.py +++ b/awx/main/tests/functional/commands/test_run_fact_cache_receiver.py @@ -10,7 +10,7 @@ import json from django.utils import timezone # AWX -from awx.main.management.commands.run_fact_cache_receiver import FactCacheReceiver +from awx.main.management.commands.run_fact_cache_receiver import FactBrokerWorker from awx.main.models.fact import Fact from awx.main.models.inventory import Host @@ -40,24 +40,24 @@ def check_process_fact_message_module(fact_returned, data, module_name): @pytest.mark.django_db def test_process_fact_message_ansible(fact_msg_ansible): - receiver = FactCacheReceiver() - fact_returned = receiver.process_fact_message(fact_msg_ansible) + receiver = FactBrokerWorker(None) + fact_returned = receiver.process_fact_message(fact_msg_ansible, None) check_process_fact_message_module(fact_returned, fact_msg_ansible, 'ansible') @pytest.mark.django_db def test_process_fact_message_packages(fact_msg_packages): - receiver = FactCacheReceiver() - fact_returned = receiver.process_fact_message(fact_msg_packages) + receiver = FactBrokerWorker(None) + fact_returned = receiver.process_fact_message(fact_msg_packages, None) check_process_fact_message_module(fact_returned, fact_msg_packages, 'packages') @pytest.mark.django_db def test_process_fact_message_services(fact_msg_services): - receiver = FactCacheReceiver() - fact_returned = receiver.process_fact_message(fact_msg_services) + receiver = FactBrokerWorker(None) + fact_returned = receiver.process_fact_message(fact_msg_services, None) check_process_fact_message_module(fact_returned, fact_msg_services, 'services') @@ -77,25 +77,12 @@ def test_process_facts_message_ansible_overwrite(fact_scans, fact_msg_ansible): key = 'ansible.overwrite' value = 'hello world' - receiver = FactCacheReceiver() - receiver.process_fact_message(fact_msg_ansible) + receiver = FactBrokerWorker(None) + receiver.process_fact_message(fact_msg_ansible, None) fact_msg_ansible['facts'][key] = value - fact_returned = receiver.process_fact_message(fact_msg_ansible) + fact_returned = receiver.process_fact_message(fact_msg_ansible, None) fact_obj = Fact.objects.get(id=fact_returned.id) assert key in fact_obj.facts assert fact_msg_ansible['facts'] == (json.loads(fact_obj.facts) if isinstance(fact_obj.facts, unicode) else fact_obj.facts) # TODO: Just make response.data['facts'] when we're only dealing with postgres, or if jsonfields ever fixes this bug - - -# Ensure that the message flows from the socket through to process_fact_message() -@pytest.mark.django_db -def test_run_receiver(mocker, fact_msg_ansible): - mocker.patch("awx.main.socket_queue.Socket.listen", return_value=[fact_msg_ansible]) - - receiver = FactCacheReceiver() - mocker.patch.object(receiver, 'process_fact_message', return_value=None) - - receiver.run_receiver(use_processing_threads=False) - - receiver.process_fact_message.assert_called_once_with(fact_msg_ansible) diff --git a/awx/plugins/fact_caching/tower.py b/awx/plugins/fact_caching/tower.py index 00f280a884..57d413f018 100755 --- a/awx/plugins/fact_caching/tower.py +++ b/awx/plugins/fact_caching/tower.py @@ -53,7 +53,7 @@ class CacheModule(BaseCacheModule): self.connection = Connection(self.callback_connection) self.exchange = Exchange(self.callback_queue, type='direct') self.producer = Producer(self.connection) - + def filter_ansible_facts(self, facts): return dict((k, facts[k]) for k in facts.keys() if k.startswith('ansible_')) From f77caf8b8bc2050b98c3161f634ae7480e3229e5 Mon Sep 17 00:00:00 2001 From: Matthew Jones Date: Fri, 18 Nov 2016 12:13:09 -0500 Subject: [PATCH 3/3] Convert the fact data structure to the right type --- awx/api/serializers.py | 9 +++++++++ 1 file changed, 9 insertions(+) diff --git a/awx/api/serializers.py b/awx/api/serializers.py index 2148ed5ac2..4c5fee889b 100644 --- a/awx/api/serializers.py +++ b/awx/api/serializers.py @@ -5,6 +5,7 @@ import copy import json import re +import six import logging from collections import OrderedDict from dateutil import rrule @@ -3037,3 +3038,11 @@ class FactSerializer(BaseFactSerializer): res = super(FactSerializer, self).get_related(obj) res['host'] = obj.host.get_absolute_url() return res + + def to_representation(self, obj): + ret = super(FactSerializer, self).to_representation(obj) + if obj is None: + return ret + if 'facts' in ret and isinstance(ret['facts'], six.string_types): + ret['facts'] = json.loads(ret['facts']) + return ret