mirror of https://github.com/ansible/awx.git synced 2024-10-28 02:25:27 +03:00

Merge pull request #5938 from chrismeyersfsu/feature-jsonsearch

basic fact search grammar
This commit is contained in:
Chris Meyers 2017-04-04 09:46:55 -04:00 committed by GitHub
commit 83e186cb59
6 changed files with 340 additions and 3 deletions

View File

@ -75,7 +75,7 @@ class FieldLookupBackend(BaseFilterBackend):
RESERVED_NAMES = ('page', 'page_size', 'format', 'order', 'order_by',
'search', 'type')
'search', 'type', 'host_filter')
SUPPORTED_LOOKUPS = ('exact', 'iexact', 'contains', 'icontains',
'startswith', 'istartswith', 'endswith', 'iendswith',

View File

@ -78,6 +78,7 @@ from awx.api.metadata import RoleMetadata
from awx.main.consumers import emit_channel_notification
from awx.main.models.unified_jobs import ACTIVE_STATES
from awx.main.scheduler.tasks import run_job_complete
from awx.main.fields import DynamicFilterField
logger = logging.getLogger('awx.api.views')
@ -1701,6 +1702,14 @@ class HostList(ListCreateAPIView):
serializer_class = HostSerializer
capabilities_prefetch = ['inventory.admin']
def get_queryset(self):
qs = super(HostList, self).get_queryset()
filter_string = self.request.query_params.get('host_filter', None)
if filter_string:
filter_q = DynamicFilterField.filter_string_to_q(filter_string)
qs = qs.filter(filter_q)
return qs
class HostDetail(RetrieveUpdateDestroyAPIView):

View File

@ -1,7 +1,11 @@
# Copyright (c) 2015 Ansible, Inc.
# All Rights Reserved.
# Python
import json
import re
import sys
from pyparsing import infixNotation, opAssoc, Optional, Literal, CharsNotIn
# Django
from django.db.models.signals import (
@ -18,6 +22,7 @@ from django.db.models.fields.related import (
from django.utils.encoding import smart_text
from django.db.models import Q
# Django-JSONField
from jsonfield import JSONField as upstream_JSONField
@ -27,7 +32,7 @@ from awx.main.models.rbac import batch_role_ancestor_rebuilding, Role
from awx.main.utils import get_current_apps
__all__ = ['AutoOneToOneField', 'ImplicitRoleField', 'JSONField']
__all__ = ['AutoOneToOneField', 'ImplicitRoleField', 'JSONField', 'DynamicFilterField']
class JSONField(upstream_JSONField):
@ -292,3 +297,187 @@ class ImplicitRoleField(models.ForeignKey):
child_ids = [x for x in Role_.parents.through.objects.filter(to_role_id__in=role_ids).distinct().values_list('from_role_id', flat=True)]
Role.rebuild_role_ancestor_list([], child_ids)
unicode_spaces = [unichr(c) for c in xrange(sys.maxunicode) if unichr(c).isspace()]
unicode_spaces_other = unicode_spaces + [u'(', u')', u'=', u'"']
def string_to_type(t):
if t == 'true':
return True
elif t == 'false':
return False
if re.search('^[-+]?[0-9]+$',t):
return int(t)
if re.search('^[-+]?[0-9]+\.[0-9]+$',t):
return float(t)
return t
class DynamicFilterField(models.TextField):
class BoolOperand(object):
def __init__(self, t):
#print("Got t %s" % t)
kwargs = dict()
k, v = self._extract_key_value(t)
k, v = self._json_path_to_contains(k, v)
kwargs[k] = v
self.result = Q(**kwargs)
TODO: We should be able to express this in the grammar and let
pyparsing do the heavy lifting.
TODO: separate django filter requests from our custom json filter
request so we don't process the key any. This could be
accomplished using a whitelist or introspecting the
relationship refered to to see if it's a jsonb type.
def _json_path_to_contains(self, k, v):
pieces = k.split('__')
flag_first_arr_found = False
assembled_k = ''
assembled_v = v
last_kv = None
last_v = None
contains_count = 0
for i, piece in enumerate(pieces):
if flag_first_arr_found is False and piece.endswith('[]'):
assembled_k += '%s__contains' % (piece[0:-2])
contains_count += 1
flag_first_arr_found = True
elif flag_first_arr_found is False and i == len(pieces) - 1:
assembled_k += '%s' % piece
elif flag_first_arr_found is False:
assembled_k += '%s__' % piece
elif flag_first_arr_found is True:
new_kv = dict()
if piece.endswith('[]'):
new_v = []
new_kv[piece[0:-2]] = new_v
new_v = dict()
new_kv[piece] = new_v
if last_v is None:
last_v = []
assembled_v = last_v
if type(last_v) is list:
elif type(last_v) is dict:
last_kv[last_kv.keys()[0]] = new_kv
last_v = new_v
last_kv = new_kv
contains_count += 1
if contains_count > 1:
if type(last_v) is list:
if type(last_v) is dict:
last_kv[last_kv.keys()[0]] = v
return (assembled_k, assembled_v)
def _extract_key_value(self, t):
t_len = len(t)
k = None
v = None
# key
# "something"=
v_offset = 2
if t_len >= 2 and t[0] == "\"" and t[2] == "\"":
k = t[1]
v_offset = 4
# something=
k = t[0]
# value
# ="something"
if t_len > (v_offset + 2) and t[v_offset] == "\"" and t[v_offset + 2] == "\"":
v = t[v_offset + 1]
# empty ""
elif t_len > (v_offset + 1):
v = ""
# no ""
v = string_to_type(t[v_offset])
return (k, v)
class BoolBinOp(object):
def __init__(self, t):
self.left = t[0][0].result
self.right = t[0][2].result
self.result = self.execute_logic(self.left, self.right)
class BoolAnd(BoolBinOp):
def execute_logic(self, left, right):
return left & right
class BoolOr(BoolBinOp):
def execute_logic(self, left, right):
return left | right
class BoolNot(object):
def __init__(self,t):
self.right = t[0][1]
self.result = ~self.right
def filter_string_to_q(cls, filter_string):
* handle values with " via: a.b.c.d="hello\"world"
* handle keys with " via: a.\"b.c="yeah"
* handle key with __ in it
* add not support
* transform [] into contains via: a.b.c[].d[].e.f[]="blah"
* handle optional value quoted: a.b.c=""
atom = CharsNotIn(unicode_spaces_other)
atom_inside_quotes = CharsNotIn(u'"')
atom_quoted = Literal('"') + Optional(atom_inside_quotes) + Literal('"')
EQUAL = Literal('=')
grammar = ((atom_quoted | atom) + EQUAL + Optional((atom_quoted | atom)))
boolExpr = infixNotation(grammar, [
#("not", 1, opAssoc.RIGHT, cls.BoolNot),
("and", 2, opAssoc.LEFT, cls.BoolAnd),
("or", 2, opAssoc.LEFT, cls.BoolOr),
res = boolExpr.parseString('(' + filter_string + ')')
if len(res) > 0:
return res[0].result
raise RuntimeError("Parsing the filter_string %s went terribly wrong" % filter_string)

View File

@ -0,0 +1,49 @@
# TODO: As of writing this our only concern is ensuring that the fact feature is reflected in the Host endpoint.
# Other host tests should live here to make this test suite more complete.
import pytest
import urllib
from awx.api.versioning import reverse
from awx.main.models import Organization, Host, Group, Inventory
def inventory_structure():
org = Organization.objects.create(name="org")
inv = Inventory.objects.create(name="inv", organization=org)
Host.objects.create(name="host1", inventory=inv)
Host.objects.create(name="host2", inventory=inv)
Host.objects.create(name="host3", inventory=inv)
Group.objects.create(name="g1", inventory=inv)
Group.objects.create(name="g2", inventory=inv)
Group.objects.create(name="g3", inventory=inv)
def test_q1(inventory_structure, get, user):
def evaluate_query(query, expected_hosts):
url = reverse('api:host_list')
get_params = "?host_filter=%s" % urllib.quote(query, safe='')
response = get(url + get_params, user('admin', True))
hosts = response.data['results']
assert len(expected_hosts) == len(hosts)
host_ids = [host['id'] for host in hosts]
for i, expected_host in enumerate(expected_hosts):
assert expected_host.id in host_ids
hosts = Host.objects.all()
groups = Group.objects.all()
groups[0].hosts.add(hosts[0], hosts[1])
groups[1].hosts.add(hosts[0], hosts[1], hosts[2])
query = '(name="host1" and groups__name="g1")'
evaluate_query(query, [hosts[0]])
query = '(name="host1" and groups__name="g1") or (name="host3" and groups__name="g2")'
evaluate_query(query, [hosts[0], hosts[2]])

View File

@ -7,10 +7,19 @@ from awx.api.filters import FieldLookupBackend
from awx.main.models import (AdHocCommand, AuthToken, CustomInventoryScript,
Credential, Job, JobTemplate, SystemJob,
UnifiedJob, User, WorkflowJob,
WorkflowJobTemplate, WorkflowJobOptions)
WorkflowJobTemplate, WorkflowJobOptions,
from awx.main.models.jobs import JobOptions
def test_related():
field_lookup = FieldLookupBackend()
lookup = '__'.join(['inventory', 'organization', 'pk'])
field, new_lookup = field_lookup.get_field_from_lookup(InventorySource, lookup)
@pytest.mark.parametrize(u"empty_value", [u'', ''])
def test_empty_in(empty_value):
field_lookup = FieldLookupBackend()

View File

@ -0,0 +1,81 @@
# Python
import pytest
from pyparsing import ParseException
from awx.main.fields import DynamicFilterField
# Django
from django.db.models import Q
class TestDynamicFilterFieldFilterStringToQ():
@pytest.mark.parametrize("filter_string,q_expected", [
('facts__facts__blank=""', Q(facts__facts__blank="")),
('"facts__facts__ space "="f"', Q(**{ "facts__facts__ space ": "f"})),
('"facts__facts__ e "=no_quotes_here', Q(**{ "facts__facts__ e ": "no_quotes_here"})),
('a__b__c=3', Q(**{ "a__b__c": 3})),
('a__b__c=3.14', Q(**{ "a__b__c": 3.14})),
('a__b__c=true', Q(**{ "a__b__c": True})),
('a__b__c=false', Q(**{ "a__b__c": False})),
('a__b__c="true"', Q(**{ "a__b__c": "true"})),
#('"a__b\"__c"="true"', Q(**{ "a__b\"__c": "true"})),
#('a__b\"__c="true"', Q(**{ "a__b\"__c": "true"})),
def test_query_generated(self, filter_string, q_expected):
q = DynamicFilterField.filter_string_to_q(filter_string)
assert str(q) == str(q_expected)
@pytest.mark.parametrize("filter_string", [
'a__b__c__ space =ggg',
def test_invalid_filter_strings(self, filter_string):
with pytest.raises(ParseException):
@pytest.mark.parametrize("filter_string,q_expected", [
(u'(a=abc\u1F5E3def)', Q(**{u"a": u"abc\u1F5E3def"})),
def test_unicode(self, filter_string, q_expected):
q = DynamicFilterField.filter_string_to_q(filter_string)
assert str(q) == str(q_expected)
@pytest.mark.parametrize("filter_string,q_expected", [
('(a=b)', Q(**{"a": "b"})),
('a=b and c=d', Q(**{"a": "b"}) & Q(**{"c": "d"})),
('(a=b and c=d)', Q(**{"a": "b"}) & Q(**{"c": "d"})),
('a=b or c=d', Q(**{"a": "b"}) | Q(**{"c": "d"})),
('(a=b and c=d) or (e=f)', (Q(**{"a": "b"}) & Q(**{"c": "d"})) | (Q(**{"e": "f"}))),
('(a=b) and (c=d or (e=f and (g=h or i=j))) or (y=z)', Q(**{"a": "b"}) & (Q(**{"c": "d"}) | (Q(**{"e": "f"}) & (Q(**{"g": "h"}) | Q(**{"i": "j"})))) | Q(**{"y": "z"}))
def test_boolean_parenthesis(self, filter_string, q_expected):
q = DynamicFilterField.filter_string_to_q(filter_string)
assert str(q) == str(q_expected)
@pytest.mark.parametrize("filter_string,q_expected", [
('a__b__c[]=3', Q(**{ "a__b__c__contains": 3})),
('a__b__c[]=3.14', Q(**{ "a__b__c__contains": 3.14})),
('a__b__c[]=true', Q(**{ "a__b__c__contains": True})),
('a__b__c[]=false', Q(**{ "a__b__c__contains": False})),
('a__b__c[]="true"', Q(**{ "a__b__c__contains": "true"})),
('a__b__c[]__d[]="foobar"', Q(**{ "a__b__c__contains": [{"d": ["foobar"]}]})),
('a__b__c[]__d="foobar"', Q(**{ "a__b__c__contains": [{"d": "foobar"}]})),
('a__b__c[]__d__e="foobar"', Q(**{ "a__b__c__contains": [{"d": {"e": "foobar"}}]})),
('a__b__c[]__d__e[]="foobar"', Q(**{ "a__b__c__contains": [{"d": {"e": ["foobar"]}}]})),
('a__b__c[]__d__e__f[]="foobar"', Q(**{ "a__b__c__contains": [{"d": {"e": {"f": ["foobar"]}}}]})),
('(a__b__c[]__d__e__f[]="foobar") and (a__b__c[]__d__e[]="foobar")', Q(**{ "a__b__c__contains": [{"d": {"e": {"f": ["foobar"]}}}]}) & Q(**{ "a__b__c__contains": [{"d": {"e": ["foobar"]}}]})),
#('"a__b\"__c"="true"', Q(**{ "a__b\"__c": "true"})),
#('a__b\"__c="true"', Q(**{ "a__b\"__c": "true"})),
def test_contains_query_generated(self, filter_string, q_expected):
q = DynamicFilterField.filter_string_to_q(filter_string)
assert str(q) == str(q_expected)
#('"facts__quoted_val"="f\"oo"', 1),
#('facts__facts__arr[]="foo"', 1),
#('facts__facts__arr_nested[]__a[]="foo"', 1),