mirror of
https://github.com/ansible/awx.git
synced 2024-11-01 08:21:15 +03:00
remove mongo fact tests
This commit is contained in:
parent
bc6eaeb0a9
commit
2e86c58a65
@ -1,2 +0,0 @@
|
||||
# Copyright (c) 2015 Ansible, Inc.
|
||||
# All Rights Reserved
|
@ -1,8 +0,0 @@
|
||||
# Copyright (c) 2015 Ansible, Inc.
|
||||
# All Rights Reserved
|
||||
|
||||
from __future__ import absolute_import
|
||||
|
||||
from .models import * # noqa
|
||||
from .utils import * # noqa
|
||||
from .base import * # noqa
|
@ -1,223 +0,0 @@
|
||||
# Copyright (c) 2015 Ansible, Inc.
|
||||
# All Rights Reserved
|
||||
|
||||
# Python
|
||||
from __future__ import absolute_import
|
||||
from django.utils.timezone import now
|
||||
|
||||
# Django
|
||||
from django.conf import settings
|
||||
import django
|
||||
|
||||
# MongoEngine
|
||||
from mongoengine.connection import get_db, ConnectionError
|
||||
|
||||
# AWX
|
||||
from awx.fact.models.fact import * # noqa
|
||||
|
||||
TEST_FACT_ANSIBLE = {
|
||||
"ansible_swapfree_mb" : 4092,
|
||||
"ansible_default_ipv6" : {
|
||||
|
||||
},
|
||||
"ansible_distribution_release" : "trusty",
|
||||
"ansible_system_vendor" : "innotek GmbH",
|
||||
"ansible_os_family" : "Debian",
|
||||
"ansible_all_ipv4_addresses" : [
|
||||
"192.168.1.145"
|
||||
],
|
||||
"ansible_lsb" : {
|
||||
"release" : "14.04",
|
||||
"major_release" : "14",
|
||||
"codename" : "trusty",
|
||||
"id" : "Ubuntu",
|
||||
"description" : "Ubuntu 14.04.2 LTS"
|
||||
},
|
||||
}
|
||||
|
||||
TEST_FACT_PACKAGES = [
|
||||
{
|
||||
"name": "accountsservice",
|
||||
"architecture": "amd64",
|
||||
"source": "apt",
|
||||
"version": "0.6.35-0ubuntu7.1"
|
||||
},
|
||||
{
|
||||
"name": "acpid",
|
||||
"architecture": "amd64",
|
||||
"source": "apt",
|
||||
"version": "1:2.0.21-1ubuntu2"
|
||||
},
|
||||
{
|
||||
"name": "adduser",
|
||||
"architecture": "all",
|
||||
"source": "apt",
|
||||
"version": "3.113+nmu3ubuntu3"
|
||||
},
|
||||
]
|
||||
|
||||
TEST_FACT_SERVICES = [
|
||||
{
|
||||
"source" : "upstart",
|
||||
"state" : "waiting",
|
||||
"name" : "ureadahead-other",
|
||||
"goal" : "stop"
|
||||
},
|
||||
{
|
||||
"source" : "upstart",
|
||||
"state" : "running",
|
||||
"name" : "apport",
|
||||
"goal" : "start"
|
||||
},
|
||||
{
|
||||
"source" : "upstart",
|
||||
"state" : "waiting",
|
||||
"name" : "console-setup",
|
||||
"goal" : "stop"
|
||||
},
|
||||
]
|
||||
|
||||
|
||||
class MongoDBRequired(django.test.TestCase):
|
||||
def setUp(self):
|
||||
# Drop mongo database
|
||||
try:
|
||||
self.db = get_db()
|
||||
self.db.connection.drop_database(settings.MONGO_DB)
|
||||
except ConnectionError:
|
||||
self.skipTest('MongoDB connection failed')
|
||||
|
||||
class BaseFactTestMixin(MongoDBRequired):
|
||||
pass
|
||||
|
||||
class BaseFactTest(BaseFactTestMixin, MongoDBRequired):
|
||||
pass
|
||||
|
||||
# TODO: for now, we relate all hosts to a single inventory
|
||||
class FactScanBuilder(object):
|
||||
|
||||
def __init__(self):
|
||||
self.facts_data = {}
|
||||
self.hostname_data = []
|
||||
self.inventory_id = 1
|
||||
|
||||
self.host_objs = []
|
||||
self.fact_objs = []
|
||||
self.version_objs = []
|
||||
self.timestamps = []
|
||||
|
||||
self.epoch = now().replace(year=2015, microsecond=0)
|
||||
|
||||
def set_epoch(self, epoch):
|
||||
self.epoch = epoch
|
||||
|
||||
def add_fact(self, module, facts):
|
||||
self.facts_data[module] = facts
|
||||
|
||||
def add_hostname(self, hostname):
|
||||
self.hostname_data.append(hostname)
|
||||
|
||||
def build(self, scan_count, host_count):
|
||||
if len(self.facts_data) == 0:
|
||||
raise RuntimeError("No fact data to build populate scans. call add_fact()")
|
||||
if (len(self.hostname_data) > 0 and len(self.hostname_data) != host_count):
|
||||
raise RuntimeError("Registered number of hostnames %d does not match host_count %d" % (len(self.hostname_data), host_count))
|
||||
|
||||
if len(self.hostname_data) == 0:
|
||||
self.hostname_data = ['hostname_%s' % i for i in range(0, host_count)]
|
||||
|
||||
self.host_objs = [FactHost(hostname=hostname, inventory_id=self.inventory_id).save() for hostname in self.hostname_data]
|
||||
|
||||
for i in range(0, scan_count):
|
||||
scan = {}
|
||||
scan_version = {}
|
||||
timestamp = self.epoch.replace(year=self.epoch.year - i, microsecond=0)
|
||||
for module in self.facts_data:
|
||||
fact_objs = []
|
||||
version_objs = []
|
||||
for host in self.host_objs:
|
||||
(fact_obj, version_obj) = Fact.add_fact(timestamp=timestamp,
|
||||
host=host,
|
||||
module=module,
|
||||
fact=self.facts_data[module])
|
||||
fact_objs.append(fact_obj)
|
||||
version_objs.append(version_obj)
|
||||
scan[module] = fact_objs
|
||||
scan_version[module] = version_objs
|
||||
self.fact_objs.append(scan)
|
||||
self.version_objs.append(scan_version)
|
||||
self.timestamps.append(timestamp)
|
||||
|
||||
|
||||
def get_scan(self, index, module=None):
|
||||
res = None
|
||||
res = self.fact_objs[index]
|
||||
if module:
|
||||
res = res[module]
|
||||
return res
|
||||
|
||||
def get_scans(self, index_start=None, index_end=None):
|
||||
if index_start is None:
|
||||
index_start = 0
|
||||
if index_end is None:
|
||||
index_end = len(self.fact_objs)
|
||||
return self.fact_objs[index_start:index_end]
|
||||
|
||||
def get_scan_version(self, index, module=None):
|
||||
res = None
|
||||
res = self.version_objs[index]
|
||||
if module:
|
||||
res = res[module]
|
||||
return res
|
||||
|
||||
def get_scan_versions(self, index_start=None, index_end=None):
|
||||
if index_start is None:
|
||||
index_start = 0
|
||||
if index_end is None:
|
||||
index_end = len(self.version_objs)
|
||||
return self.version_objs[index_start:index_end]
|
||||
|
||||
def get_hostname(self, index):
|
||||
return self.host_objs[index].hostname
|
||||
|
||||
def get_hostnames(self, index_start=None, index_end=None):
|
||||
if index_start is None:
|
||||
index_start = 0
|
||||
if index_end is None:
|
||||
index_end = len(self.host_objs)
|
||||
|
||||
return [self.host_objs[i].hostname for i in range(index_start, index_end)]
|
||||
|
||||
def get_inventory_id(self):
|
||||
return self.inventory_id
|
||||
|
||||
def set_inventory_id(self, inventory_id):
|
||||
self.inventory_id = inventory_id
|
||||
|
||||
def get_host(self, index):
|
||||
return self.host_objs[index]
|
||||
|
||||
def get_hosts(self, index_start=None, index_end=None):
|
||||
if index_start is None:
|
||||
index_start = 0
|
||||
if index_end is None:
|
||||
index_end = len(self.host_objs)
|
||||
|
||||
return self.host_objs[index_start:index_end]
|
||||
|
||||
def get_scan_count(self):
|
||||
return len(self.fact_objs)
|
||||
|
||||
def get_host_count(self):
|
||||
return len(self.host_objs)
|
||||
|
||||
def get_timestamp(self, index):
|
||||
return self.timestamps[index]
|
||||
|
||||
def get_timestamps(self, index_start=None, index_end=None):
|
||||
if not index_start:
|
||||
index_start = 0
|
||||
if not index_end:
|
||||
len(self.timestamps)
|
||||
return self.timestamps[index_start:index_end]
|
||||
|
File diff suppressed because it is too large
Load Diff
@ -1,6 +0,0 @@
|
||||
# Copyright (c) 2015 Ansible, Inc.
|
||||
# All Rights Reserved
|
||||
|
||||
from __future__ import absolute_import
|
||||
|
||||
from .fact import * # noqa
|
@ -1,9 +0,0 @@
|
||||
# Copyright (c) 2015 Ansible, Inc.
|
||||
# All Rights Reserved
|
||||
|
||||
from __future__ import absolute_import
|
||||
|
||||
from .fact_simple import * # noqa
|
||||
from .fact_transform_pymongo import * # noqa
|
||||
from .fact_transform import * # noqa
|
||||
from .fact_get_single_facts import * # noqa
|
@ -1,96 +0,0 @@
|
||||
# Copyright (c) 2015 Ansible, Inc.
|
||||
# All Rights Reserved
|
||||
|
||||
# Python
|
||||
from __future__ import absolute_import
|
||||
|
||||
# Django
|
||||
|
||||
# AWX
|
||||
from awx.fact.models.fact import * # noqa
|
||||
from awx.fact.tests.base import BaseFactTest, FactScanBuilder, TEST_FACT_PACKAGES
|
||||
|
||||
__all__ = ['FactGetSingleFactsTest', 'FactGetSingleFactsMultipleScansTest',]
|
||||
|
||||
class FactGetSingleFactsTest(BaseFactTest):
|
||||
def setUp(self):
|
||||
super(FactGetSingleFactsTest, self).setUp()
|
||||
self.builder = FactScanBuilder()
|
||||
self.builder.add_fact('packages', TEST_FACT_PACKAGES)
|
||||
self.builder.add_fact('nested', TEST_FACT_PACKAGES)
|
||||
self.builder.build(scan_count=1, host_count=20)
|
||||
|
||||
def check_query_results(self, facts_known, facts):
|
||||
self.assertIsNotNone(facts)
|
||||
self.assertEqual(len(facts_known), len(facts), "More or less facts found than expected")
|
||||
# Ensure only 'acpid' is returned
|
||||
for fact in facts:
|
||||
self.assertEqual(len(fact.fact), 1)
|
||||
self.assertEqual(fact.fact[0]['name'], 'acpid')
|
||||
|
||||
# Transpose facts to a dict with key id
|
||||
count = 0
|
||||
facts_dict = {}
|
||||
for fact in facts:
|
||||
count += 1
|
||||
facts_dict[fact.id] = fact
|
||||
self.assertEqual(count, len(facts_known))
|
||||
|
||||
# For each fact that we put into the database on setup,
|
||||
# we should find that fact in the result set returned
|
||||
for fact_known in facts_known:
|
||||
key = fact_known.id
|
||||
self.assertIn(key, facts_dict)
|
||||
self.assertEqual(len(facts_dict[key].fact), 1)
|
||||
|
||||
def check_query_results_nested(self, facts):
|
||||
self.assertIsNotNone(facts)
|
||||
for fact in facts:
|
||||
self.assertEqual(len(fact.fact), 1)
|
||||
self.assertEqual(fact.fact['nested'][0]['name'], 'acpid')
|
||||
|
||||
def test_single_host(self):
|
||||
facts = Fact.get_single_facts(self.builder.get_hostnames(0, 1), 'name', 'acpid', self.builder.get_timestamp(0), 'packages')
|
||||
|
||||
self.check_query_results(self.builder.get_scan(0, 'packages')[:1], facts)
|
||||
|
||||
def test_all(self):
|
||||
facts = Fact.get_single_facts(self.builder.get_hostnames(), 'name', 'acpid', self.builder.get_timestamp(0), 'packages')
|
||||
|
||||
self.check_query_results(self.builder.get_scan(0, 'packages'), facts)
|
||||
|
||||
def test_subset_hosts(self):
|
||||
host_count = (self.builder.get_host_count() / 2)
|
||||
facts = Fact.get_single_facts(self.builder.get_hostnames(0, host_count), 'name', 'acpid', self.builder.get_timestamp(0), 'packages')
|
||||
|
||||
self.check_query_results(self.builder.get_scan(0, 'packages')[:host_count], facts)
|
||||
|
||||
def test_get_single_facts_nested(self):
|
||||
facts = Fact.get_single_facts(self.builder.get_hostnames(), 'nested.name', 'acpid', self.builder.get_timestamp(0), 'packages')
|
||||
|
||||
self.check_query_results_nested(facts)
|
||||
|
||||
class FactGetSingleFactsMultipleScansTest(BaseFactTest):
|
||||
def setUp(self):
|
||||
super(FactGetSingleFactsMultipleScansTest, self).setUp()
|
||||
self.builder = FactScanBuilder()
|
||||
self.builder.add_fact('packages', TEST_FACT_PACKAGES)
|
||||
self.builder.build(scan_count=10, host_count=10)
|
||||
|
||||
def test_1_host(self):
|
||||
facts = Fact.get_single_facts(self.builder.get_hostnames(0, 1), 'name', 'acpid', self.builder.get_timestamp(0), 'packages')
|
||||
self.assertEqual(len(facts), 1)
|
||||
self.assertEqual(facts[0], self.builder.get_scan(0, 'packages')[0])
|
||||
|
||||
def test_multiple_hosts(self):
|
||||
facts = Fact.get_single_facts(self.builder.get_hostnames(0, 3), 'name', 'acpid', self.builder.get_timestamp(0), 'packages')
|
||||
self.assertEqual(len(facts), 3)
|
||||
for i, fact in enumerate(facts):
|
||||
self.assertEqual(fact, self.builder.get_scan(0, 'packages')[i])
|
||||
|
||||
def test_middle_of_timeline(self):
|
||||
facts = Fact.get_single_facts(self.builder.get_hostnames(0, 3), 'name', 'acpid', self.builder.get_timestamp(4), 'packages')
|
||||
self.assertEqual(len(facts), 3)
|
||||
for i, fact in enumerate(facts):
|
||||
self.assertEqual(fact, self.builder.get_scan(4, 'packages')[i])
|
||||
|
@ -1,127 +0,0 @@
|
||||
# Copyright (c) 2015 Ansible, Inc.
|
||||
# All Rights Reserved
|
||||
|
||||
# Python
|
||||
from __future__ import absolute_import
|
||||
import os
|
||||
import json
|
||||
|
||||
# Django
|
||||
from django.utils.timezone import now
|
||||
from dateutil.relativedelta import relativedelta
|
||||
|
||||
# AWX
|
||||
from awx.fact.models.fact import * # noqa
|
||||
from awx.fact.tests.base import BaseFactTest, FactScanBuilder, TEST_FACT_PACKAGES
|
||||
|
||||
__all__ = ['FactHostTest', 'FactTest', 'FactGetHostVersionTest', 'FactGetHostTimelineTest']
|
||||
|
||||
# damn you python 2.6
|
||||
def timedelta_total_seconds(timedelta):
|
||||
return (
|
||||
timedelta.microseconds + 0.0 +
|
||||
(timedelta.seconds + timedelta.days * 24 * 3600) * 10 ** 6) / 10 ** 6
|
||||
|
||||
|
||||
class FactHostTest(BaseFactTest):
|
||||
def test_create_host(self):
|
||||
host = FactHost(hostname='hosty', inventory_id=1)
|
||||
host.save()
|
||||
|
||||
host = FactHost.objects.get(hostname='hosty', inventory_id=1)
|
||||
self.assertIsNotNone(host, "Host added but not found")
|
||||
self.assertEqual('hosty', host.hostname, "Gotten record hostname does not match expected hostname")
|
||||
self.assertEqual(1, host.inventory_id, "Gotten record inventory_id does not match expected inventory_id")
|
||||
|
||||
# Ensure an error is raised for .get() that doesn't match a record.
|
||||
def test_get_host_id_no_result(self):
|
||||
host = FactHost(hostname='hosty', inventory_id=1)
|
||||
host.save()
|
||||
|
||||
self.assertRaises(FactHost.DoesNotExist, FactHost.objects.get, hostname='doesnotexist', inventory_id=1)
|
||||
|
||||
class FactTest(BaseFactTest):
|
||||
def setUp(self):
|
||||
super(FactTest, self).setUp()
|
||||
|
||||
def test_add_fact(self):
|
||||
timestamp = now().replace(microsecond=0)
|
||||
host = FactHost(hostname="hosty", inventory_id=1).save()
|
||||
(f_obj, v_obj) = Fact.add_fact(host=host, timestamp=timestamp, module='packages', fact=TEST_FACT_PACKAGES)
|
||||
f = Fact.objects.get(id=f_obj.id)
|
||||
v = FactVersion.objects.get(id=v_obj.id)
|
||||
|
||||
self.assertEqual(f.id, f_obj.id)
|
||||
self.assertEqual(f.module, 'packages')
|
||||
self.assertEqual(f.fact, TEST_FACT_PACKAGES)
|
||||
self.assertEqual(f.timestamp, timestamp)
|
||||
|
||||
# host relationship created
|
||||
self.assertEqual(f.host.id, host.id)
|
||||
|
||||
# version created and related
|
||||
self.assertEqual(v.id, v_obj.id)
|
||||
self.assertEqual(v.timestamp, timestamp)
|
||||
self.assertEqual(v.host.id, host.id)
|
||||
self.assertEqual(v.fact.id, f_obj.id)
|
||||
self.assertEqual(v.fact.module, 'packages')
|
||||
|
||||
# Note: Take the failure of this with a grain of salt.
|
||||
# The test almost entirely depends on the specs of the system running on.
|
||||
def test_add_fact_performance_4mb_file(self):
|
||||
timestamp = now().replace(microsecond=0)
|
||||
host = FactHost(hostname="hosty", inventory_id=1).save()
|
||||
|
||||
from awx.fact import tests
|
||||
with open('%s/data/file_scan.json' % os.path.dirname(os.path.realpath(tests.__file__))) as f:
|
||||
data = json.load(f)
|
||||
|
||||
t1 = now()
|
||||
(f_obj, v_obj) = Fact.add_fact(host=host, timestamp=timestamp, module='packages', fact=data)
|
||||
t2 = now()
|
||||
diff = timedelta_total_seconds(t2 - t1)
|
||||
print("add_fact save time: %s (s)" % diff)
|
||||
# Note: 20 is realllly high. This should complete in < 2 seconds
|
||||
self.assertLessEqual(diff, 20)
|
||||
|
||||
Fact.objects.get(id=f_obj.id)
|
||||
FactVersion.objects.get(id=v_obj.id)
|
||||
|
||||
class FactGetHostVersionTest(BaseFactTest):
|
||||
def setUp(self):
|
||||
super(FactGetHostVersionTest, self).setUp()
|
||||
self.builder = FactScanBuilder()
|
||||
self.builder.add_fact('packages', TEST_FACT_PACKAGES)
|
||||
self.builder.build(scan_count=2, host_count=1)
|
||||
|
||||
def test_get_host_version_exact_timestamp(self):
|
||||
fact_known = self.builder.get_scan(0, 'packages')[0]
|
||||
fact = Fact.get_host_version(hostname=self.builder.get_hostname(0), inventory_id=self.builder.get_inventory_id(), timestamp=self.builder.get_timestamp(0), module='packages')
|
||||
self.assertIsNotNone(fact)
|
||||
self.assertEqual(fact_known, fact)
|
||||
|
||||
def test_get_host_version_lte_timestamp(self):
|
||||
timestamp = self.builder.get_timestamp(0) + relativedelta(days=1)
|
||||
fact_known = self.builder.get_scan(0, 'packages')[0]
|
||||
fact = Fact.get_host_version(hostname=self.builder.get_hostname(0), inventory_id=self.builder.get_inventory_id(), timestamp=timestamp, module='packages')
|
||||
self.assertIsNotNone(fact)
|
||||
self.assertEqual(fact_known, fact)
|
||||
|
||||
def test_get_host_version_none(self):
|
||||
timestamp = self.builder.get_timestamp(0) - relativedelta(years=20)
|
||||
fact = Fact.get_host_version(hostname=self.builder.get_hostname(0), inventory_id=self.builder.get_inventory_id(), timestamp=timestamp, module='packages')
|
||||
self.assertIsNone(fact)
|
||||
|
||||
class FactGetHostTimelineTest(BaseFactTest):
|
||||
def setUp(self):
|
||||
super(FactGetHostTimelineTest, self).setUp()
|
||||
self.builder = FactScanBuilder()
|
||||
self.builder.add_fact('packages', TEST_FACT_PACKAGES)
|
||||
self.builder.build(scan_count=20, host_count=1)
|
||||
|
||||
def test_get_host_timeline_ok(self):
|
||||
timestamps = Fact.get_host_timeline(hostname=self.builder.get_hostname(0), inventory_id=self.builder.get_inventory_id(), module='packages')
|
||||
self.assertIsNotNone(timestamps)
|
||||
self.assertEqual(len(timestamps), self.builder.get_scan_count())
|
||||
for i in range(0, self.builder.get_scan_count()):
|
||||
self.assertEqual(timestamps[i], self.builder.get_timestamp(i))
|
@ -1,120 +0,0 @@
|
||||
# Copyright (c) 2015 Ansible, Inc.
|
||||
# All Rights Reserved
|
||||
|
||||
# Python
|
||||
from __future__ import absolute_import
|
||||
from datetime import datetime
|
||||
|
||||
# Django
|
||||
from django.conf import settings
|
||||
|
||||
# Pymongo
|
||||
import pymongo
|
||||
|
||||
# AWX
|
||||
from awx.fact.models.fact import * # noqa
|
||||
from awx.fact.tests.base import BaseFactTest
|
||||
|
||||
__all__ = ['FactTransformTest', 'FactTransformUpdateTest',]
|
||||
|
||||
TEST_FACT_PACKAGES_WITH_DOTS = [
|
||||
{
|
||||
"name": "acpid3.4",
|
||||
"version": "1:2.0.21-1ubuntu2",
|
||||
"deeper.key": "some_value"
|
||||
},
|
||||
{
|
||||
"name": "adduser.2",
|
||||
"source": "apt",
|
||||
"version": "3.113+nmu3ubuntu3"
|
||||
},
|
||||
{
|
||||
"what.ever." : {
|
||||
"shallowish.key": "some_shallow_value"
|
||||
}
|
||||
}
|
||||
]
|
||||
|
||||
TEST_FACT_PACKAGES_WITH_DOLLARS = [
|
||||
{
|
||||
"name": "acpid3$4",
|
||||
"version": "1:2.0.21-1ubuntu2",
|
||||
"deeper.key": "some_value"
|
||||
},
|
||||
{
|
||||
"name": "adduser$2",
|
||||
"source": "apt",
|
||||
"version": "3.113+nmu3ubuntu3"
|
||||
},
|
||||
{
|
||||
"what.ever." : {
|
||||
"shallowish.key": "some_shallow_value"
|
||||
}
|
||||
}
|
||||
]
|
||||
class FactTransformTest(BaseFactTest):
|
||||
def setUp(self):
|
||||
super(FactTransformTest, self).setUp()
|
||||
# TODO: get host settings from config
|
||||
self.client = pymongo.MongoClient('localhost', 27017)
|
||||
self.db2 = self.client[settings.MONGO_DB]
|
||||
|
||||
self.timestamp = datetime.now().replace(microsecond=0)
|
||||
|
||||
def setup_create_fact_dot(self):
|
||||
self.host = FactHost(hostname='hosty', inventory_id=1).save()
|
||||
self.f = Fact(timestamp=self.timestamp, module='packages', fact=TEST_FACT_PACKAGES_WITH_DOTS, host=self.host)
|
||||
self.f.save()
|
||||
|
||||
def setup_create_fact_dollar(self):
|
||||
self.host = FactHost(hostname='hosty', inventory_id=1).save()
|
||||
self.f = Fact(timestamp=self.timestamp, module='packages', fact=TEST_FACT_PACKAGES_WITH_DOLLARS, host=self.host)
|
||||
self.f.save()
|
||||
|
||||
def test_fact_with_dot_serialized(self):
|
||||
self.setup_create_fact_dot()
|
||||
|
||||
q = {
|
||||
'_id': self.f.id
|
||||
}
|
||||
|
||||
# Bypass mongoengine and pymongo transform to get record
|
||||
f_dict = self.db2['fact'].find_one(q)
|
||||
self.assertIn('what\uff0Eever\uff0E', f_dict['fact'][2])
|
||||
|
||||
def test_fact_with_dot_serialized_pymongo(self):
|
||||
#self.setup_create_fact_dot()
|
||||
|
||||
host = FactHost(hostname='hosty', inventory_id=1).save()
|
||||
f = self.db['fact'].insert({
|
||||
'hostname': 'hosty',
|
||||
'fact': TEST_FACT_PACKAGES_WITH_DOTS,
|
||||
'timestamp': self.timestamp,
|
||||
'host': host.id,
|
||||
'module': 'packages',
|
||||
})
|
||||
|
||||
q = {
|
||||
'_id': f
|
||||
}
|
||||
# Bypass mongoengine and pymongo transform to get record
|
||||
f_dict = self.db2['fact'].find_one(q)
|
||||
self.assertIn('what\uff0Eever\uff0E', f_dict['fact'][2])
|
||||
|
||||
def test_fact_with_dot_deserialized_pymongo(self):
|
||||
self.setup_create_fact_dot()
|
||||
|
||||
q = {
|
||||
'_id': self.f.id
|
||||
}
|
||||
f_dict = self.db['fact'].find_one(q)
|
||||
self.assertIn('what.ever.', f_dict['fact'][2])
|
||||
|
||||
def test_fact_with_dot_deserialized(self):
|
||||
self.setup_create_fact_dot()
|
||||
|
||||
f = Fact.objects.get(id=self.f.id)
|
||||
self.assertIn('what.ever.', f.fact[2])
|
||||
|
||||
class FactTransformUpdateTest(BaseFactTest):
|
||||
pass
|
@ -1,96 +0,0 @@
|
||||
# Copyright (c) 2015 Ansible, Inc.
|
||||
# All Rights Reserved
|
||||
|
||||
# Python
|
||||
from __future__ import absolute_import
|
||||
from datetime import datetime
|
||||
|
||||
# Django
|
||||
from django.conf import settings
|
||||
|
||||
# Pymongo
|
||||
import pymongo
|
||||
|
||||
# AWX
|
||||
from awx.fact.models.fact import * # noqa
|
||||
from awx.fact.tests.base import BaseFactTest
|
||||
|
||||
__all__ = ['FactSerializePymongoTest', 'FactDeserializePymongoTest',]
|
||||
|
||||
class FactPymongoBaseTest(BaseFactTest):
|
||||
def setUp(self):
|
||||
super(FactPymongoBaseTest, self).setUp()
|
||||
# TODO: get host settings from config
|
||||
self.client = pymongo.MongoClient('localhost', 27017)
|
||||
self.db2 = self.client[settings.MONGO_DB]
|
||||
|
||||
def _create_fact(self):
|
||||
fact = {}
|
||||
fact[self.k] = self.v
|
||||
q = {
|
||||
'hostname': 'blah'
|
||||
}
|
||||
h = self.db['fact_host'].insert(q)
|
||||
q = {
|
||||
'host': h,
|
||||
'module': 'blah',
|
||||
'timestamp': datetime.now(),
|
||||
'fact': fact
|
||||
}
|
||||
f = self.db['fact'].insert(q)
|
||||
return f
|
||||
|
||||
def check_transform(self, id):
|
||||
raise RuntimeError("Must override")
|
||||
|
||||
def create_dot_fact(self):
|
||||
self.k = 'this.is.a.key'
|
||||
self.v = 'this.is.a.value'
|
||||
|
||||
self.k_uni = 'this\uff0Eis\uff0Ea\uff0Ekey'
|
||||
|
||||
return self._create_fact()
|
||||
|
||||
def create_dollar_fact(self):
|
||||
self.k = 'this$is$a$key'
|
||||
self.v = 'this$is$a$value'
|
||||
|
||||
self.k_uni = 'this\uff04is\uff04a\uff04key'
|
||||
|
||||
return self._create_fact()
|
||||
|
||||
class FactSerializePymongoTest(FactPymongoBaseTest):
|
||||
def check_transform(self, id):
|
||||
q = {
|
||||
'_id': id
|
||||
}
|
||||
f = self.db2.fact.find_one(q)
|
||||
self.assertIn(self.k_uni, f['fact'])
|
||||
self.assertEqual(f['fact'][self.k_uni], self.v)
|
||||
|
||||
# Ensure key . are being transformed to the equivalent unicode into the database
|
||||
def test_key_transform_dot(self):
|
||||
f = self.create_dot_fact()
|
||||
self.check_transform(f)
|
||||
|
||||
# Ensure key $ are being transformed to the equivalent unicode into the database
|
||||
def test_key_transform_dollar(self):
|
||||
f = self.create_dollar_fact()
|
||||
self.check_transform(f)
|
||||
|
||||
class FactDeserializePymongoTest(FactPymongoBaseTest):
|
||||
def check_transform(self, id):
|
||||
q = {
|
||||
'_id': id
|
||||
}
|
||||
f = self.db.fact.find_one(q)
|
||||
self.assertIn(self.k, f['fact'])
|
||||
self.assertEqual(f['fact'][self.k], self.v)
|
||||
|
||||
def test_key_transform_dot(self):
|
||||
f = self.create_dot_fact()
|
||||
self.check_transform(f)
|
||||
|
||||
def test_key_transform_dollar(self):
|
||||
f = self.create_dollar_fact()
|
||||
self.check_transform(f)
|
@ -1,6 +0,0 @@
|
||||
# Copyright (c) 2015 Ansible, Inc.
|
||||
# All Rights Reserved
|
||||
|
||||
from __future__ import absolute_import
|
||||
|
||||
from .dbtransform import * # noqa
|
@ -1,113 +0,0 @@
|
||||
# Copyright (c) 2015 Ansible, Inc.
|
||||
# All Rights Reserved
|
||||
|
||||
from django.test import TestCase
|
||||
|
||||
# AWX
|
||||
from awx.fact.models.fact import * # noqa
|
||||
from awx.fact.utils.dbtransform import KeyTransform
|
||||
|
||||
#__all__ = ['DBTransformTest', 'KeyTransformUnitTest']
|
||||
__all__ = ['KeyTransformUnitTest']
|
||||
|
||||
class KeyTransformUnitTest(TestCase):
|
||||
def setUp(self):
|
||||
super(KeyTransformUnitTest, self).setUp()
|
||||
self.key_transform = KeyTransform([('.', '\uff0E'), ('$', '\uff04')])
|
||||
|
||||
def test_no_replace(self):
|
||||
value = {
|
||||
"a_key_with_a_dict" : {
|
||||
"key" : "value",
|
||||
"nested_key_with_dict": {
|
||||
"nested_key_with_value" : "deep_value"
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
data = self.key_transform.transform_incoming(value, None)
|
||||
self.assertEqual(data, value)
|
||||
|
||||
data = self.key_transform.transform_outgoing(value, None)
|
||||
self.assertEqual(data, value)
|
||||
|
||||
def test_complex(self):
|
||||
value = {
|
||||
"a.key.with.a.dict" : {
|
||||
"key" : "value",
|
||||
"nested.key.with.dict": {
|
||||
"nested.key.with.value" : "deep_value"
|
||||
}
|
||||
}
|
||||
}
|
||||
value_transformed = {
|
||||
"a\uff0Ekey\uff0Ewith\uff0Ea\uff0Edict" : {
|
||||
"key" : "value",
|
||||
"nested\uff0Ekey\uff0Ewith\uff0Edict": {
|
||||
"nested\uff0Ekey\uff0Ewith\uff0Evalue" : "deep_value"
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
data = self.key_transform.transform_incoming(value, None)
|
||||
self.assertEqual(data, value_transformed)
|
||||
|
||||
data = self.key_transform.transform_outgoing(value_transformed, None)
|
||||
self.assertEqual(data, value)
|
||||
|
||||
def test_simple(self):
|
||||
value = {
|
||||
"a.key" : "value"
|
||||
}
|
||||
value_transformed = {
|
||||
"a\uff0Ekey" : "value"
|
||||
}
|
||||
|
||||
data = self.key_transform.transform_incoming(value, None)
|
||||
self.assertEqual(data, value_transformed)
|
||||
|
||||
data = self.key_transform.transform_outgoing(value_transformed, None)
|
||||
self.assertEqual(data, value)
|
||||
|
||||
def test_nested_dict(self):
|
||||
value = {
|
||||
"a.key.with.a.dict" : {
|
||||
"nested.key." : "value"
|
||||
}
|
||||
}
|
||||
value_transformed = {
|
||||
"a\uff0Ekey\uff0Ewith\uff0Ea\uff0Edict" : {
|
||||
"nested\uff0Ekey\uff0E" : "value"
|
||||
}
|
||||
}
|
||||
|
||||
data = self.key_transform.transform_incoming(value, None)
|
||||
self.assertEqual(data, value_transformed)
|
||||
|
||||
data = self.key_transform.transform_outgoing(value_transformed, None)
|
||||
self.assertEqual(data, value)
|
||||
|
||||
def test_array(self):
|
||||
value = {
|
||||
"a.key.with.an.array" : [
|
||||
{
|
||||
"key.with.dot" : "value"
|
||||
}
|
||||
]
|
||||
}
|
||||
value_transformed = {
|
||||
"a\uff0Ekey\uff0Ewith\uff0Ean\uff0Earray" : [
|
||||
{
|
||||
"key\uff0Ewith\uff0Edot" : "value"
|
||||
}
|
||||
]
|
||||
}
|
||||
data = self.key_transform.transform_incoming(value, None)
|
||||
self.assertEqual(data, value_transformed)
|
||||
|
||||
data = self.key_transform.transform_outgoing(value_transformed, None)
|
||||
self.assertEqual(data, value)
|
||||
|
||||
'''
|
||||
class DBTransformTest(BaseTest, MongoDBRequired):
|
||||
'''
|
Loading…
Reference in New Issue
Block a user