1
0
mirror of https://github.com/ansible/awx.git synced 2024-11-01 08:21:15 +03:00

Merge pull request #3192 from wwitzel3/test-refactoring

Update some tests and refactor some code.
This commit is contained in:
Wayne Witzel III 2016-08-04 11:17:42 -04:00 committed by GitHub
commit f30292c7f1
4 changed files with 63 additions and 30 deletions

View File

@ -2966,21 +2966,7 @@ class JobJobTasksList(BaseJobEventsList):
return ({'detail': 'Parent event not found.'}, -1, status.HTTP_404_NOT_FOUND)
parent_task = parent_task[0]
# Some events correspond to a playbook or task starting up,
# and these are what we're interested in here.
STARTING_EVENTS = ('playbook_on_task_start', 'playbook_on_setup')
# We need to pull information about each start event.
#
# This is super tricky, because this table has a one-to-many
# relationship with itself (parent-child), and we're getting
# information for an arbitrary number of children. This means we
# need stats on grandchildren, sorted by child.
queryset = (JobEvent.objects.filter(parent__parent=parent_task,
parent__event__in=STARTING_EVENTS)
.values('parent__id', 'event', 'changed')
.annotate(num=Count('event'))
.order_by('parent__id'))
queryset = JobEvent.start_event_queryset(parent_task)
# The data above will come back in a list, but we are going to
# want to access it based on the parent id, so map it into a

View File

@ -1206,6 +1206,30 @@ class JobEvent(CreatedModifiedModel):
job.inventory.update_computed_fields()
emit_websocket_notification('/socket.io/jobs', 'summary_complete', dict(unified_job_id=job.id))
@property
def STARTING_EVENTS():
return ('playbook_on_task_start', 'playbook_on_setup')
@classmethod
def get_startevent_queryset(cls, parent_task, ordering=None):
'''
We need to pull information about each start event.
This is super tricky, because this table has a one-to-many
relationship with itself (parent-child), and we're getting
information for an arbitrary number of children. This means we
need stats on grandchildren, sorted by child.
'''
qs = (JobEvent.objects.filter(parent__parent=parent_task,
parent__event__in=STARTING_EVENTS)
.values('parent__id', 'event', 'changed')
.annotate(num=Count('event'))
.order_by('parent__id'))
if ordering is not None:
qs = qs.order_by(ordering)
return qs
class SystemJobOptions(BaseModel):
'''
Common fields for SystemJobTemplate and SystemJob.

View File

@ -1,26 +1,42 @@
import pytest
from awx.main.models.credential import Credential
from awx.main.models.jobs import Job
from awx.main.models.inventory import Inventory
from awx.main.tasks import RunJob
@pytest.fixture
def options():
return {
'username':'test',
'password':'test',
'ssh_key_data': """-----BEGIN PRIVATE KEY-----\nstuff==\n-----END PRIVATE KEY-----""",
'authorize': True,
'authorize_password': 'passwd',
}
def test_net_cred_parse(mocker, options):
def test_aws_cred_parse(mocker):
with mocker.patch('django.db.ConnectionRouter.db_for_write'):
job = Job(id=1)
job.inventory = mocker.MagicMock(spec=Inventory, id=2)
options = {
'kind': 'aws',
'username': 'aws_user',
'password': 'aws_passwd',
'security_token': 'token',
}
job.cloud_credential = Credential(**options)
run_job = RunJob()
mocker.patch.object(run_job, 'should_use_proot', return_value=False)
env = run_job.build_env(job, private_data_dir='/tmp')
assert env['AWS_ACCESS_KEY'] == options['username']
assert env['AWS_SECRET_KEY'] == options['password']
assert env['AWS_SECURITY_TOKEN'] == options['security_token']
def test_net_cred_parse(mocker):
with mocker.patch('django.db.ConnectionRouter.db_for_write'):
job = Job(id=1)
job.inventory = mocker.MagicMock(spec=Inventory, id=2)
options = {
'username':'test',
'password':'test',
'authorize': True,
'authorize_password': 'passwd',
}
job.network_credential = Credential(**options)
run_job = RunJob()
@ -33,10 +49,17 @@ def test_net_cred_parse(mocker, options):
assert env['ANSIBLE_NET_AUTHORIZE_PASSWORD'] == options['authorize_password']
def test_net_cred_ssh_agent(mocker, options, get_ssh_version):
def test_net_cred_ssh_agent(mocker, get_ssh_version):
with mocker.patch('django.db.ConnectionRouter.db_for_write'):
run_job = RunJob()
options = {
'username':'test',
'password':'test',
'ssh_key_data': """-----BEGIN PRIVATE KEY-----\nstuff==\n-----END PRIVATE KEY-----""",
'authorize': True,
'authorize_password': 'passwd',
}
mock_job_attrs = {'forks': False, 'id': 1, 'cancel_flag': False, 'status': 'running', 'job_type': 'normal',
'credential': None, 'cloud_credential': None, 'network_credential': Credential(**options),
'become_enabled': False, 'become_method': None, 'become_username': None,