1
0
mirror of https://github.com/ansible/awx.git synced 2024-11-01 08:21:15 +03:00

Merge branch 'fix_fact_caching' into devel

* fix_fact_caching:
  Convert the fact data structure to the right type
  Fixing up fact cachine related unit tests
  Port fact caching system to rabbitmq
This commit is contained in:
Matthew Jones 2016-11-21 23:15:13 -05:00
commit ddaf4ff2d0
7 changed files with 64 additions and 115 deletions

View File

@ -5,6 +5,7 @@
import copy import copy
import json import json
import re import re
import six
import logging import logging
from collections import OrderedDict from collections import OrderedDict
from dateutil import rrule from dateutil import rrule
@ -3041,3 +3042,11 @@ class FactSerializer(BaseFactSerializer):
res = super(FactSerializer, self).get_related(obj) res = super(FactSerializer, self).get_related(obj)
res['host'] = obj.host.get_absolute_url() res['host'] = obj.host.get_absolute_url()
return res return res
def to_representation(self, obj):
ret = super(FactSerializer, self).to_representation(obj)
if obj is None:
return ret
if 'facts' in ret and isinstance(ret['facts'], six.string_types):
ret['facts'] = json.loads(ret['facts'])
return ret

View File

@ -3,9 +3,11 @@
# Python # Python
import logging import logging
from threading import Thread
from datetime import datetime from datetime import datetime
from kombu import Connection, Exchange, Queue
from kombu.mixins import ConsumerMixin
# Django # Django
from django.core.management.base import NoArgsCommand from django.core.management.base import NoArgsCommand
from django.conf import settings from django.conf import settings
@ -14,15 +16,23 @@ from django.utils import timezone
# AWX # AWX
from awx.main.models.fact import Fact from awx.main.models.fact import Fact
from awx.main.models.inventory import Host from awx.main.models.inventory import Host
from awx.main.socket_queue import Socket
logger = logging.getLogger('awx.main.commands.run_fact_cache_receiver') logger = logging.getLogger('awx.main.commands.run_fact_cache_receiver')
class FactCacheReceiver(object): class FactBrokerWorker(ConsumerMixin):
def __init__(self):
def __init__(self, connection):
self.connection = connection
self.timestamp = None self.timestamp = None
def get_consumers(self, Consumer, channel):
return [Consumer(queues=[Queue(settings.FACT_QUEUE,
Exchange(settings.FACT_QUEUE, type='direct'),
routing_key=settings.FACT_QUEUE)],
accept=['json'],
callbacks=[self.process_fact_message])]
def _determine_module(self, facts): def _determine_module(self, facts):
# Symantically determine the module type # Symantically determine the module type
if len(facts) == 1: if len(facts) == 1:
@ -40,17 +50,13 @@ class FactCacheReceiver(object):
facts = self._extract_module_facts(module, facts) facts = self._extract_module_facts(module, facts)
return (module, facts) return (module, facts)
def process_fact_message(self, message): def process_fact_message(self, body, message):
hostname = message['host'] print body
inventory_id = message['inventory_id'] print type(body)
facts_data = message['facts'] hostname = body['host']
date_key = message['date_key'] inventory_id = body['inventory_id']
facts_data = body['facts']
# TODO: in ansible < v2 module_setup is emitted for "smart" fact caching. date_key = body['date_key']
# ansible v2 will not emit this message. Thus, this can be removed at that time.
if 'module_setup' in facts_data and len(facts_data) == 1:
logger.info('Received module_setup message')
return None
try: try:
host_obj = Host.objects.get(name=hostname, inventory__id=inventory_id) host_obj = Host.objects.get(name=hostname, inventory__id=inventory_id)
@ -79,32 +85,18 @@ class FactCacheReceiver(object):
logger.info('Created new fact <fact_id, module> <%s, %s>' % (fact_obj.id, module_name)) logger.info('Created new fact <fact_id, module> <%s, %s>' % (fact_obj.id, module_name))
return fact_obj return fact_obj
def run_receiver(self, use_processing_threads=True):
with Socket('fact_cache', 'r') as facts:
for message in facts.listen():
if 'host' not in message or 'facts' not in message or 'date_key' not in message:
logger.warn('Received invalid message %s' % message)
continue
logger.info('Received message %s' % message)
if use_processing_threads:
wt = Thread(target=self.process_fact_message, args=(message,))
wt.start()
else:
self.process_fact_message(message)
class Command(NoArgsCommand): class Command(NoArgsCommand):
''' '''
blah blah Save Fact Event packets to the database as emitted from a Tower Scan Job
''' '''
help = 'Launch the Fact Cache Receiver' help = 'Launch the Fact Cache Receiver'
def handle_noargs(self, **options): def handle_noargs(self, **options):
fcr = FactCacheReceiver() with Connection(settings.BROKER_URL) as conn:
fact_cache_port = settings.FACT_CACHE_PORT try:
logger.info('Listening on port http://0.0.0.0:' + str(fact_cache_port)) worker = FactBrokerWorker(conn)
try: worker.run()
fcr.run_receiver() except KeyboardInterrupt:
except KeyboardInterrupt: pass
pass

View File

@ -2,7 +2,6 @@
# All Rights Reserved. # All Rights Reserved.
# Python # Python
import json
import logging import logging
import os import os
@ -12,37 +11,7 @@ from django.conf import settings
# Kombu # Kombu
from kombu import Connection, Exchange, Producer from kombu import Connection, Exchange, Producer
__all__ = ['FifoQueue', 'CallbackQueueDispatcher'] __all__ = ['CallbackQueueDispatcher']
# TODO: Figure out wtf to do with this class
class FifoQueue(object):
"""An abstraction class implemented for a simple push/pull queue.
Intended to allow alteration of backend details in a single, consistent
way throughout the Tower application.
"""
def __init__(self, queue_name):
"""Instantiate a queue object, which is able to interact with a
particular queue.
"""
self._queue_name = queue_name
def __len__(self):
"""Return the length of the Redis list."""
#return redis.llen(self._queue_name)
return 0
def push(self, value):
"""Push a value onto the right side of the queue."""
#redis.rpush(self._queue_name, json.dumps(value))
def pop(self):
"""Retrieve a value from the left side of the queue."""
#answer = redis.lpop(self._queue_name)
answer = None
if answer:
return json.loads(answer)
class CallbackQueueDispatcher(object): class CallbackQueueDispatcher(object):

View File

@ -852,6 +852,7 @@ class RunJob(BaseTask):
# Set environment variables related to scan jobs # Set environment variables related to scan jobs
if job.job_type == PERM_INVENTORY_SCAN: if job.job_type == PERM_INVENTORY_SCAN:
env['FACT_QUEUE'] = settings.FACT_QUEUE
env['ANSIBLE_LIBRARY'] = self.get_path_to('..', 'plugins', 'library') env['ANSIBLE_LIBRARY'] = self.get_path_to('..', 'plugins', 'library')
env['ANSIBLE_CACHE_PLUGINS'] = self.get_path_to('..', 'plugins', 'fact_caching') env['ANSIBLE_CACHE_PLUGINS'] = self.get_path_to('..', 'plugins', 'fact_caching')
env['ANSIBLE_CACHE_PLUGIN'] = "tower" env['ANSIBLE_CACHE_PLUGIN'] = "tower"

View File

@ -10,7 +10,7 @@ import json
from django.utils import timezone from django.utils import timezone
# AWX # AWX
from awx.main.management.commands.run_fact_cache_receiver import FactCacheReceiver from awx.main.management.commands.run_fact_cache_receiver import FactBrokerWorker
from awx.main.models.fact import Fact from awx.main.models.fact import Fact
from awx.main.models.inventory import Host from awx.main.models.inventory import Host
@ -40,25 +40,22 @@ def check_process_fact_message_module(fact_returned, data, module_name):
@pytest.mark.django_db @pytest.mark.django_db
def test_process_fact_message_ansible(fact_msg_ansible, monkeypatch_jsonbfield_get_db_prep_save): def test_process_fact_message_ansible(fact_msg_ansible, monkeypatch_jsonbfield_get_db_prep_save):
receiver = FactCacheReceiver() receiver = FactBrokerWorker(None)
fact_returned = receiver.process_fact_message(fact_msg_ansible) fact_returned = receiver.process_fact_message(fact_msg_ansible, None)
check_process_fact_message_module(fact_returned, fact_msg_ansible, 'ansible') check_process_fact_message_module(fact_returned, fact_msg_ansible, 'ansible')
@pytest.mark.django_db @pytest.mark.django_db
def test_process_fact_message_packages(fact_msg_packages, monkeypatch_jsonbfield_get_db_prep_save): def test_process_fact_message_packages(fact_msg_packages, monkeypatch_jsonbfield_get_db_prep_save):
receiver = FactCacheReceiver() receiver = FactBrokerWorker(None)
fact_returned = receiver.process_fact_message(fact_msg_packages) fact_returned = receiver.process_fact_message(fact_msg_packages, None)
check_process_fact_message_module(fact_returned, fact_msg_packages, 'packages') check_process_fact_message_module(fact_returned, fact_msg_packages, 'packages')
@pytest.mark.django_db @pytest.mark.django_db
def test_process_fact_message_services(fact_msg_services, monkeypatch_jsonbfield_get_db_prep_save): def test_process_fact_message_services(fact_msg_services, monkeypatch_jsonbfield_get_db_prep_save):
receiver = FactCacheReceiver() receiver = FactBrokerWorker(None)
fact_returned = receiver.process_fact_message(fact_msg_services) fact_returned = receiver.process_fact_message(fact_msg_services, None)
check_process_fact_message_module(fact_returned, fact_msg_services, 'services') check_process_fact_message_module(fact_returned, fact_msg_services, 'services')
@ -77,25 +74,12 @@ def test_process_facts_message_ansible_overwrite(fact_scans, fact_msg_ansible, m
key = 'ansible.overwrite' key = 'ansible.overwrite'
value = 'hello world' value = 'hello world'
receiver = FactCacheReceiver() receiver = FactBrokerWorker(None)
receiver.process_fact_message(fact_msg_ansible) receiver.process_fact_message(fact_msg_ansible, None)
fact_msg_ansible['facts'][key] = value fact_msg_ansible['facts'][key] = value
fact_returned = receiver.process_fact_message(fact_msg_ansible) fact_returned = receiver.process_fact_message(fact_msg_ansible, None)
fact_obj = Fact.objects.get(id=fact_returned.id) fact_obj = Fact.objects.get(id=fact_returned.id)
assert key in fact_obj.facts assert key in fact_obj.facts
assert fact_msg_ansible['facts'] == (json.loads(fact_obj.facts) if isinstance(fact_obj.facts, unicode) else fact_obj.facts) # TODO: Just make response.data['facts'] when we're only dealing with postgres, or if jsonfields ever fixes this bug assert fact_msg_ansible['facts'] == (json.loads(fact_obj.facts) if isinstance(fact_obj.facts, unicode) else fact_obj.facts) # TODO: Just make response.data['facts'] when we're only dealing with postgres, or if jsonfields ever fixes this bug
# Ensure that the message flows from the socket through to process_fact_message()
@pytest.mark.django_db
def test_run_receiver(mocker, fact_msg_ansible):
mocker.patch("awx.main.socket_queue.Socket.listen", return_value=[fact_msg_ansible])
receiver = FactCacheReceiver()
mocker.patch.object(receiver, 'process_fact_message', return_value=None)
receiver.run_receiver(use_processing_threads=False)
receiver.process_fact_message.assert_called_once_with(fact_msg_ansible)

View File

@ -29,20 +29,15 @@
# ARISING IN ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE # ARISING IN ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE
# POSSIBILITY OF SUCH DAMAGE. # POSSIBILITY OF SUCH DAMAGE.
import sys
import os import os
import time import time
from ansible import constants as C
try: try:
from ansible.cache.base import BaseCacheModule from ansible.cache.base import BaseCacheModule
except: except:
from ansible.plugins.cache.base import BaseCacheModule from ansible.plugins.cache.base import BaseCacheModule
try: from kombu import Connection, Exchange, Producer
import zmq
except ImportError:
print("pyzmq is required")
sys.exit(1)
class CacheModule(BaseCacheModule): class CacheModule(BaseCacheModule):
@ -52,19 +47,12 @@ class CacheModule(BaseCacheModule):
self._cache = {} self._cache = {}
self._all_keys = {} self._all_keys = {}
# This is the local tower zmq connection
self._tower_connection = C.CACHE_PLUGIN_CONNECTION
self.date_key = time.time() self.date_key = time.time()
try: self.callback_connection = os.environ['CALLBACK_CONNECTION']
self.context = zmq.Context() self.callback_queue = os.environ['FACT_QUEUE']
self.socket = self.context.socket(zmq.REQ) self.connection = Connection(self.callback_connection)
self.socket.setsockopt(zmq.RCVTIMEO, 4000) self.exchange = Exchange(self.callback_queue, type='direct')
self.socket.setsockopt(zmq.LINGER, 2000) self.producer = Producer(self.connection)
self.socket.connect(self._tower_connection)
except Exception, e:
print("Connection to zeromq failed at %s with error: %s" % (str(self._tower_connection),
str(e)))
sys.exit(1)
def filter_ansible_facts(self, facts): def filter_ansible_facts(self, facts):
return dict((k, facts[k]) for k in facts.keys() if k.startswith('ansible_')) return dict((k, facts[k]) for k in facts.keys() if k.startswith('ansible_'))
@ -117,8 +105,12 @@ class CacheModule(BaseCacheModule):
} }
# Emit fact data to tower for processing # Emit fact data to tower for processing
self.socket.send_json(packet) self.producer.publish(packet,
self.socket.recv() serializer='json',
compression='bzip2',
exchange=self.exchange,
declare=[self.exchange],
routing_key=self.callback_queue)
def keys(self): def keys(self):
return self._cache.keys() return self._cache.keys()

View File

@ -798,6 +798,8 @@ INTERNAL_API_URL = 'http://127.0.0.1:%s' % DEVSERVER_DEFAULT_PORT
USE_CALLBACK_QUEUE = True USE_CALLBACK_QUEUE = True
CALLBACK_QUEUE = "callback_tasks" CALLBACK_QUEUE = "callback_tasks"
FACT_QUEUE = "facts"
SCHEDULER_QUEUE = "scheduler" SCHEDULER_QUEUE = "scheduler"
TASK_COMMAND_PORT = 6559 TASK_COMMAND_PORT = 6559