1
0
mirror of https://github.com/ansible/awx.git synced 2024-10-31 06:51:10 +03:00

Merge pull request #843 from ansible/remove_old_tests

Removing old unused tests
This commit is contained in:
Matthew Jones 2017-12-14 23:55:12 -05:00 committed by GitHub
commit 5ec537bad2
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
17 changed files with 0 additions and 9695 deletions

View File

@ -1,7 +0,0 @@
Old Tests
=========
This are the old django.TestCase / unittest.TestCase tests for Tower.
No new tests should be added to this folder. Overtime, we will be refactoring
tests out of this folder as py.test tests and into their respective functional
and unit folders.

View File

@ -1,961 +0,0 @@
# Copyright (c) 2015 Ansible, Inc.
# All Rights Reserved.
# Python
import glob
import os
import subprocess
import tempfile
import mock
import unittest2 as unittest
# Django
from django.conf import settings
from django.core.urlresolvers import reverse
# Django-CRUM
from crum import impersonate
# AWX
from awx.main.utils import * # noqa
from awx.main.models import * # noqa
from awx.main.tests.base import BaseJobExecutionTest
from awx.main.tests.data.ssh import (
TEST_SSH_KEY_DATA,
TEST_SSH_KEY_DATA_LOCKED,
TEST_SSH_KEY_DATA_UNLOCK,
)
__all__ = ['RunAdHocCommandTest', 'AdHocCommandApiTest']
class BaseAdHocCommandTest(BaseJobExecutionTest):
'''
Common initialization for testing ad hoc commands.
'''
def setUp(self):
with ignore_inventory_computed_fields():
super(BaseAdHocCommandTest, self).setUp()
self.setup_instances()
self.setup_users()
self.organization = self.make_organizations(self.super_django_user, 1)[0]
self.organization.admin_role.members.add(self.normal_django_user)
self.inventory = self.organization.inventories.create(name='test-inventory', description='description for test-inventory')
self.host = self.inventory.hosts.create(name='host.example.com')
self.host2 = self.inventory.hosts.create(name='host2.example.com')
self.group = self.inventory.groups.create(name='test-group')
self.group2 = self.inventory.groups.create(name='test-group2')
self.group.hosts.add(self.host)
self.group2.hosts.add(self.host, self.host2)
self.inventory2 = self.organization.inventories.create(name='test-inventory2')
self.host3 = self.inventory2.hosts.create(name='host3.example.com')
self.credential = None
settings.INTERNAL_API_URL = self.live_server_url
settings.CALLBACK_CONSUMER_PORT = ''
def create_test_credential(self, **kwargs):
self.credential = self.make_credential(**kwargs)
return self.credential
@unittest.skipIf(os.environ.get('SKIP_SLOW_TESTS', False), 'Skipping slow test')
class RunAdHocCommandTest(BaseAdHocCommandTest):
'''
Test cases for RunAdHocCommand celery task.
'''
def create_test_ad_hoc_command(self, **kwargs):
with impersonate(self.super_django_user):
opts = {
'inventory': self.inventory,
'credential': self.credential,
'job_type': 'run',
'module_name': 'command',
'module_args': 'uptime',
}
opts.update(kwargs)
self.ad_hoc_command = AdHocCommand.objects.create(**opts)
return self.ad_hoc_command
def check_ad_hoc_command_events(self, ad_hoc_command, runner_status='ok',
hosts=None):
ad_hoc_command_events = ad_hoc_command.ad_hoc_command_events.all()
for ad_hoc_command_event in ad_hoc_command_events:
unicode(ad_hoc_command_event) # For test coverage.
should_be_failed = bool(runner_status not in ('ok', 'skipped'))
should_be_changed = bool(runner_status in ('ok', 'failed') and ad_hoc_command.job_type == 'run')
if hosts is not None:
host_pks = set([x.pk for x in hosts])
else:
host_pks = set(ad_hoc_command.inventory.hosts.values_list('pk', flat=True))
qs = ad_hoc_command_events.filter(event=('runner_on_%s' % runner_status))
self.assertEqual(qs.count(), len(host_pks))
for evt in qs:
self.assertTrue(evt.host_id in host_pks)
self.assertTrue(evt.host_name)
self.assertEqual(evt.failed, should_be_failed)
self.assertEqual(evt.changed, should_be_changed)
def test_run_ad_hoc_command(self):
ad_hoc_command = self.create_test_ad_hoc_command()
self.assertEqual(ad_hoc_command.status, 'new')
self.assertFalse(ad_hoc_command.passwords_needed_to_start)
self.assertTrue(ad_hoc_command.signal_start())
ad_hoc_command = AdHocCommand.objects.get(pk=ad_hoc_command.pk)
self.check_job_result(ad_hoc_command, 'successful')
self.check_ad_hoc_command_events(ad_hoc_command, 'ok')
def test_check_mode_ad_hoc_command(self):
ad_hoc_command = self.create_test_ad_hoc_command(module_name='ping', module_args='', job_type='check')
self.assertEqual(ad_hoc_command.status, 'new')
self.assertFalse(ad_hoc_command.passwords_needed_to_start)
self.assertTrue(ad_hoc_command.signal_start())
ad_hoc_command = AdHocCommand.objects.get(pk=ad_hoc_command.pk)
self.check_job_result(ad_hoc_command, 'successful')
self.check_ad_hoc_command_events(ad_hoc_command, 'ok')
def test_run_ad_hoc_command_that_fails(self):
ad_hoc_command = self.create_test_ad_hoc_command(module_args='false')
self.assertEqual(ad_hoc_command.status, 'new')
self.assertFalse(ad_hoc_command.passwords_needed_to_start)
self.assertTrue(ad_hoc_command.signal_start())
ad_hoc_command = AdHocCommand.objects.get(pk=ad_hoc_command.pk)
self.check_job_result(ad_hoc_command, 'failed')
self.check_ad_hoc_command_events(ad_hoc_command, 'failed')
def test_check_mode_where_command_would_fail(self):
ad_hoc_command = self.create_test_ad_hoc_command(job_type='check', module_args='false')
self.assertEqual(ad_hoc_command.status, 'new')
self.assertFalse(ad_hoc_command.passwords_needed_to_start)
self.assertTrue(ad_hoc_command.signal_start())
ad_hoc_command = AdHocCommand.objects.get(pk=ad_hoc_command.pk)
self.check_job_result(ad_hoc_command, 'successful')
self.check_ad_hoc_command_events(ad_hoc_command, 'skipped')
@mock.patch('awx.main.tasks.BaseTask.run_pexpect', return_value=('canceled', 0))
def test_cancel_ad_hoc_command(self, ignore):
ad_hoc_command = self.create_test_ad_hoc_command()
self.assertEqual(ad_hoc_command.status, 'new')
self.assertFalse(ad_hoc_command.cancel_flag)
self.assertFalse(ad_hoc_command.passwords_needed_to_start)
ad_hoc_command.cancel_flag = True
ad_hoc_command.save(update_fields=['cancel_flag'])
self.assertTrue(ad_hoc_command.signal_start())
ad_hoc_command = AdHocCommand.objects.get(pk=ad_hoc_command.pk)
self.check_job_result(ad_hoc_command, 'canceled')
self.assertTrue(ad_hoc_command.cancel_flag)
# Calling cancel afterwards just returns the cancel flag.
self.assertTrue(ad_hoc_command.cancel())
# Read attribute for test coverage.
ad_hoc_command.celery_task
ad_hoc_command.celery_task_id = ''
ad_hoc_command.save(update_fields=['celery_task_id'])
self.assertEqual(ad_hoc_command.celery_task, None)
# Unable to start ad hoc command again.
self.assertFalse(ad_hoc_command.signal_start())
@mock.patch('awx.main.tasks.BaseTask.run_pexpect', return_value=('successful', 0))
def test_ad_hoc_command_options(self, ignore):
ad_hoc_command = self.create_test_ad_hoc_command(forks=2, verbosity=2)
self.assertEqual(ad_hoc_command.status, 'new')
self.assertFalse(ad_hoc_command.passwords_needed_to_start)
self.assertTrue(ad_hoc_command.signal_start())
ad_hoc_command = AdHocCommand.objects.get(pk=ad_hoc_command.pk)
self.check_job_result(ad_hoc_command, 'successful')
self.assertTrue('"--forks=2"' in ad_hoc_command.job_args)
self.assertTrue('"-vv"' in ad_hoc_command.job_args)
# Test with basic become privilege escalation
ad_hoc_command2 = self.create_test_ad_hoc_command(become_enabled=True)
self.assertEqual(ad_hoc_command2.status, 'new')
self.assertFalse(ad_hoc_command2.passwords_needed_to_start)
self.assertTrue(ad_hoc_command2.signal_start())
ad_hoc_command2 = AdHocCommand.objects.get(pk=ad_hoc_command2.pk)
self.check_job_result(ad_hoc_command2, ('successful', 'failed'))
self.assertTrue('"--become"' in ad_hoc_command2.job_args)
def test_limit_option(self):
# Test limit by hostname.
ad_hoc_command = self.create_test_ad_hoc_command(limit='host.example.com')
self.assertEqual(ad_hoc_command.status, 'new')
self.assertFalse(ad_hoc_command.passwords_needed_to_start)
self.assertTrue(ad_hoc_command.signal_start())
ad_hoc_command = AdHocCommand.objects.get(pk=ad_hoc_command.pk)
self.check_job_result(ad_hoc_command, 'successful')
self.check_ad_hoc_command_events(ad_hoc_command, 'ok', hosts=[self.host])
self.assertTrue('"host.example.com"' in ad_hoc_command.job_args)
# Test limit by group name.
ad_hoc_command2 = self.create_test_ad_hoc_command(limit='test-group')
self.assertEqual(ad_hoc_command2.status, 'new')
self.assertFalse(ad_hoc_command2.passwords_needed_to_start)
self.assertTrue(ad_hoc_command2.signal_start())
ad_hoc_command2 = AdHocCommand.objects.get(pk=ad_hoc_command.pk)
self.check_job_result(ad_hoc_command2, 'successful')
self.check_ad_hoc_command_events(ad_hoc_command2, 'ok', hosts=[self.host])
# Test limit by host not in inventory.
ad_hoc_command3 = self.create_test_ad_hoc_command(limit='bad-host')
self.assertEqual(ad_hoc_command3.status, 'new')
self.assertFalse(ad_hoc_command3.passwords_needed_to_start)
self.assertTrue(ad_hoc_command3.signal_start())
ad_hoc_command3 = AdHocCommand.objects.get(pk=ad_hoc_command3.pk)
self.check_job_result(ad_hoc_command3, 'successful')
self.check_ad_hoc_command_events(ad_hoc_command3, 'ok', hosts=[])
self.assertEqual(ad_hoc_command3.ad_hoc_command_events.count(), 0)
@mock.patch('awx.main.tasks.BaseTask.run_pexpect', return_value=('successful', 0))
def test_ssh_username_and_password(self, ignore):
self.create_test_credential(username='sshuser', password='sshpass')
ad_hoc_command = self.create_test_ad_hoc_command()
self.assertEqual(ad_hoc_command.status, 'new')
self.assertFalse(ad_hoc_command.passwords_needed_to_start)
self.assertTrue(ad_hoc_command.signal_start())
ad_hoc_command = AdHocCommand.objects.get(pk=ad_hoc_command.pk)
self.check_job_result(ad_hoc_command, 'successful')
self.assertIn('"-u"', ad_hoc_command.job_args)
self.assertIn('"--ask-pass"', ad_hoc_command.job_args)
@mock.patch('awx.main.tasks.BaseTask.run_pexpect', return_value=('successful', 0))
def test_ssh_ask_password(self, ignore):
self.create_test_credential(password='ASK')
ad_hoc_command = self.create_test_ad_hoc_command()
self.assertEqual(ad_hoc_command.status, 'new')
self.assertTrue(ad_hoc_command.passwords_needed_to_start)
self.assertTrue('ssh_password' in ad_hoc_command.passwords_needed_to_start)
self.assertFalse(ad_hoc_command.signal_start())
self.assertTrue(ad_hoc_command.signal_start(ssh_password='sshpass'))
ad_hoc_command = AdHocCommand.objects.get(pk=ad_hoc_command.pk)
self.check_job_result(ad_hoc_command, 'successful')
self.assertIn('"--ask-pass"', ad_hoc_command.job_args)
@mock.patch('awx.main.tasks.BaseTask.run_pexpect', return_value=('successful', 0))
def test_sudo_username_and_password(self, ignore):
self.create_test_credential(become_method="sudo",
become_username='sudouser',
become_password='sudopass')
ad_hoc_command = self.create_test_ad_hoc_command()
self.assertEqual(ad_hoc_command.status, 'new')
self.assertFalse(ad_hoc_command.passwords_needed_to_start)
self.assertTrue(ad_hoc_command.signal_start())
ad_hoc_command = AdHocCommand.objects.get(pk=ad_hoc_command.pk)
self.check_job_result(ad_hoc_command, ('successful', 'failed'))
self.assertIn('"--become-method"', ad_hoc_command.job_args)
self.assertIn('"--become-user"', ad_hoc_command.job_args)
self.assertIn('"--ask-become-pass"', ad_hoc_command.job_args)
self.assertNotIn('"--become"', ad_hoc_command.job_args)
@mock.patch('awx.main.tasks.BaseTask.run_pexpect', return_value=('successful', 0))
def test_sudo_ask_password(self, ignore):
self.create_test_credential(become_password='ASK')
ad_hoc_command = self.create_test_ad_hoc_command()
self.assertEqual(ad_hoc_command.status, 'new')
self.assertTrue(ad_hoc_command.passwords_needed_to_start)
self.assertTrue('become_password' in ad_hoc_command.passwords_needed_to_start)
self.assertFalse(ad_hoc_command.signal_start())
self.assertTrue(ad_hoc_command.signal_start(become_password='sudopass'))
ad_hoc_command = AdHocCommand.objects.get(pk=ad_hoc_command.pk)
self.check_job_result(ad_hoc_command, ('successful', 'failed'))
self.assertIn('"--ask-become-pass"', ad_hoc_command.job_args)
self.assertNotIn('"--become-user"', ad_hoc_command.job_args)
self.assertNotIn('"--become"', ad_hoc_command.job_args)
@mock.patch('awx.main.tasks.BaseTask.run_pexpect', return_value=('successful', 0))
def test_unlocked_ssh_key(self, ignore):
self.create_test_credential(ssh_key_data=TEST_SSH_KEY_DATA)
ad_hoc_command = self.create_test_ad_hoc_command()
self.assertEqual(ad_hoc_command.status, 'new')
self.assertFalse(ad_hoc_command.passwords_needed_to_start)
self.assertTrue(ad_hoc_command.signal_start())
ad_hoc_command = AdHocCommand.objects.get(pk=ad_hoc_command.pk)
self.check_job_result(ad_hoc_command, 'successful')
self.assertNotIn('"--private-key=', ad_hoc_command.job_args)
self.assertIn('ssh-agent', ad_hoc_command.job_args)
def test_locked_ssh_key_with_password(self):
self.create_test_credential(ssh_key_data=TEST_SSH_KEY_DATA_LOCKED,
ssh_key_unlock=TEST_SSH_KEY_DATA_UNLOCK)
ad_hoc_command = self.create_test_ad_hoc_command()
self.assertEqual(ad_hoc_command.status, 'new')
self.assertFalse(ad_hoc_command.passwords_needed_to_start)
self.assertTrue(ad_hoc_command.signal_start())
ad_hoc_command = AdHocCommand.objects.get(pk=ad_hoc_command.pk)
self.check_job_result(ad_hoc_command, 'successful')
self.assertIn('ssh-agent', ad_hoc_command.job_args)
self.assertNotIn('Bad passphrase', ad_hoc_command.result_stdout)
def test_locked_ssh_key_with_bad_password(self):
self.create_test_credential(ssh_key_data=TEST_SSH_KEY_DATA_LOCKED,
ssh_key_unlock='not the passphrase')
ad_hoc_command = self.create_test_ad_hoc_command()
self.assertEqual(ad_hoc_command.status, 'new')
self.assertFalse(ad_hoc_command.passwords_needed_to_start)
self.assertTrue(ad_hoc_command.signal_start())
ad_hoc_command = AdHocCommand.objects.get(pk=ad_hoc_command.pk)
self.check_job_result(ad_hoc_command, 'failed')
self.assertIn('ssh-agent', ad_hoc_command.job_args)
self.assertIn('Bad passphrase', ad_hoc_command.result_stdout)
def test_locked_ssh_key_ask_password(self):
self.create_test_credential(ssh_key_data=TEST_SSH_KEY_DATA_LOCKED,
ssh_key_unlock='ASK')
ad_hoc_command = self.create_test_ad_hoc_command()
self.assertEqual(ad_hoc_command.status, 'new')
self.assertTrue(ad_hoc_command.passwords_needed_to_start)
self.assertTrue('ssh_key_unlock' in ad_hoc_command.passwords_needed_to_start)
self.assertFalse(ad_hoc_command.signal_start())
self.assertTrue(ad_hoc_command.signal_start(ssh_key_unlock='not it'))
ad_hoc_command = AdHocCommand.objects.get(pk=ad_hoc_command.pk)
self.check_job_result(ad_hoc_command, 'failed')
self.assertTrue('ssh-agent' in ad_hoc_command.job_args)
self.assertTrue('Bad passphrase' in ad_hoc_command.result_stdout)
# Try again and pass correct password.
ad_hoc_command = self.create_test_ad_hoc_command()
self.assertEqual(ad_hoc_command.status, 'new')
self.assertTrue(ad_hoc_command.passwords_needed_to_start)
self.assertTrue('ssh_key_unlock' in ad_hoc_command.passwords_needed_to_start)
self.assertFalse(ad_hoc_command.signal_start())
self.assertTrue(ad_hoc_command.signal_start(ssh_key_unlock=TEST_SSH_KEY_DATA_UNLOCK))
ad_hoc_command = AdHocCommand.objects.get(pk=ad_hoc_command.pk)
self.check_job_result(ad_hoc_command, 'successful')
self.assertIn('ssh-agent', ad_hoc_command.job_args)
self.assertNotIn('Bad passphrase', ad_hoc_command.result_stdout)
def test_run_with_bubblewrap(self):
# Only run test if bubblewrap is installed
cmd = [getattr(settings, 'AWX_PROOT_CMD', 'bwrap'), '--version']
try:
proc = subprocess.Popen(cmd, stdout=subprocess.PIPE,
stderr=subprocess.PIPE)
proc.communicate()
has_bubblewrap = bool(proc.returncode == 0)
except (OSError, ValueError):
has_bubblewrap = False
if not has_bubblewrap:
self.skipTest('bubblewrap is not installed')
# Enable bubblewrap for this test.
settings.AWX_PROOT_ENABLED = True
# Hide local settings path.
settings.AWX_PROOT_HIDE_PATHS = [os.path.join(settings.BASE_DIR, 'settings')]
# Create list of paths that should not be visible to the command.
hidden_paths = [
os.path.join(settings.PROJECTS_ROOT, '*'),
os.path.join(settings.JOBOUTPUT_ROOT, '*'),
]
# Create a temp directory that should not be visible to the command.
temp_path = tempfile.mkdtemp()
self._temp_paths.append(temp_path)
hidden_paths.append(temp_path)
# Find a file in supervisor logs that should not be visible.
try:
supervisor_log_path = glob.glob('/var/log/supervisor/*')[0]
except IndexError:
supervisor_log_path = None
if supervisor_log_path:
hidden_paths.append(supervisor_log_path)
# Create and run ad hoc command.
module_args = ' && '.join(['echo %s && test ! -e %s' % (x, x) for x in hidden_paths])
ad_hoc_command = self.create_test_ad_hoc_command(module_name='shell', module_args=module_args, verbosity=2)
self.assertEqual(ad_hoc_command.status, 'new')
self.assertFalse(ad_hoc_command.passwords_needed_to_start)
self.assertTrue(ad_hoc_command.signal_start())
ad_hoc_command = AdHocCommand.objects.get(pk=ad_hoc_command.pk)
self.check_job_result(ad_hoc_command, 'successful')
self.check_ad_hoc_command_events(ad_hoc_command, 'ok')
@mock.patch('awx.main.tasks.BaseTask.run_pexpect', return_value=('failed', 0))
def test_run_with_bubblewrap_not_installed(self, ignore):
# Enable bubblewrap for this test, specify invalid bubblewrap cmd.
settings.AWX_PROOT_ENABLED = True
settings.AWX_PROOT_CMD = 'PR00T'
ad_hoc_command = self.create_test_ad_hoc_command()
self.assertEqual(ad_hoc_command.status, 'new')
self.assertFalse(ad_hoc_command.passwords_needed_to_start)
self.assertTrue(ad_hoc_command.signal_start())
ad_hoc_command = AdHocCommand.objects.get(pk=ad_hoc_command.pk)
self.check_job_result(ad_hoc_command, 'error', expect_traceback=True)
def run_pexpect_mock(self, *args, **kwargs):
return 'successful', 0
@unittest.skipIf(os.environ.get('SKIP_SLOW_TESTS', False), 'Skipping slow test')
class AdHocCommandApiTest(BaseAdHocCommandTest):
'''
Test API list/detail views for ad hoc commands.
'''
def setUp(self):
super(AdHocCommandApiTest, self).setUp()
self.create_test_credential(user=self.normal_django_user)
def run_test_ad_hoc_command(self, **kwargs):
# Post to list to start a new ad hoc command.
expect = kwargs.pop('expect', 201)
url = kwargs.pop('url', reverse('api:ad_hoc_command_list'))
data = {
'inventory': self.inventory.pk,
'credential': self.credential.pk,
'module_name': 'command',
'module_args': 'uptime',
}
data.update(kwargs)
for k,v in data.items():
if v is None:
del data[k]
return self.post(url, data, expect=expect)
@mock.patch('awx.main.tasks.BaseTask.run_pexpect', side_effect=run_pexpect_mock)
def test_ad_hoc_command_detail(self, ignore):
with self.current_user('admin'):
response1 = self.run_test_ad_hoc_command()
response2 = self.run_test_ad_hoc_command()
response3 = self.run_test_ad_hoc_command()
# Retrieve detail for ad hoc command. Only GET is supported.
with self.current_user('admin'):
url = reverse('api:ad_hoc_command_detail', args=(response1['id'],))
self.assertEqual(url, response1['url'])
response = self.get(url, expect=200)
self.assertEqual(response['credential'], self.credential.pk)
self.assertEqual(response['related']['credential'],
reverse('api:credential_detail', args=(self.credential.pk,)))
self.assertEqual(response['inventory'], self.inventory.pk)
self.assertEqual(response['related']['inventory'],
reverse('api:inventory_detail', args=(self.inventory.pk,)))
self.assertTrue(response['related']['stdout'])
self.assertTrue(response['related']['cancel'])
self.assertTrue(response['related']['relaunch'])
self.assertTrue(response['related']['events'])
self.assertTrue(response['related']['activity_stream'])
self.put(url, {}, expect=405)
self.patch(url, {}, expect=405)
self.delete(url, expect=204)
self.delete(url, expect=404)
with self.current_user('normal'):
url = reverse('api:ad_hoc_command_detail', args=(response2['id'],))
self.assertEqual(url, response2['url'])
response = self.get(url, expect=200)
self.put(url, {}, expect=405)
self.patch(url, {}, expect=405)
self.delete(url, expect=204)
self.delete(url, expect=404)
url = reverse('api:ad_hoc_command_detail', args=(response3['id'],))
self.assertEqual(url, response3['url'])
with self.current_user('other'):
response = self.get(url, expect=403)
self.put(url, {}, expect=405)
self.patch(url, {}, expect=405)
self.delete(url, expect=403)
with self.current_user('nobody'):
response = self.get(url, expect=403)
self.put(url, {}, expect=405)
self.patch(url, {}, expect=405)
self.delete(url, expect=403)
with self.current_user(None):
response = self.get(url, expect=401)
self.put(url, {}, expect=401)
self.patch(url, {}, expect=401)
self.delete(url, expect=401)
# Verify that the credential and inventory are null when they have
# been deleted, can delete an ad hoc command without inventory or
# credential.
self.credential.delete()
self.inventory.delete()
with self.current_user('admin'):
response = self.get(url, expect=200)
self.assertEqual(response['credential'], None)
self.assertEqual(response['inventory'], None)
self.delete(url, expect=204)
self.delete(url, expect=404)
@mock.patch('awx.main.tasks.BaseTask.run_pexpect', side_effect=run_pexpect_mock)
def test_ad_hoc_command_cancel(self, ignore):
# Override setting so that ad hoc command isn't actually started.
with self.settings(CELERY_UNIT_TEST=False):
with self.current_user('admin'):
response = self.run_test_ad_hoc_command()
# Retrieve the cancel URL, should indicate it can be canceled.
url = reverse('api:ad_hoc_command_cancel', args=(response['id'],))
self.assertEqual(url, response['related']['cancel'])
with self.current_user('admin'):
response = self.get(url, expect=200)
self.assertEqual(response['can_cancel'], True)
self.put(url, {}, expect=405)
self.patch(url, {}, expect=405)
self.delete(url, expect=405)
with self.current_user('normal'):
response = self.get(url, expect=200)
self.assertEqual(response['can_cancel'], True)
self.put(url, {}, expect=405)
self.patch(url, {}, expect=405)
self.delete(url, expect=405)
with self.current_user('other'):
self.get(url, expect=403)
self.post(url, {}, expect=403)
self.put(url, {}, expect=405)
self.patch(url, {}, expect=405)
self.delete(url, expect=405)
with self.current_user('nobody'):
self.get(url, expect=403)
self.post(url, {}, expect=403)
self.put(url, {}, expect=405)
self.patch(url, {}, expect=405)
self.delete(url, expect=405)
with self.current_user(None):
self.get(url, expect=401)
self.post(url, {}, expect=401)
self.put(url, {}, expect=401)
self.patch(url, {}, expect=401)
self.delete(url, expect=401)
# Cancel ad hoc command (before it starts) and verify the can_cancel
# flag is False and attempts to cancel again fail.
with self.current_user('normal'):
self.post(url, {}, expect=202)
response = self.get(url, expect=200)
self.assertEqual(response['can_cancel'], False)
self.post(url, {}, expect=403)
with self.current_user('admin'):
response = self.get(url, expect=200)
self.assertEqual(response['can_cancel'], False)
self.post(url, {}, expect=405)
@mock.patch('awx.main.tasks.BaseTask.run_pexpect', side_effect=run_pexpect_mock)
def test_ad_hoc_command_relaunch(self, ignore):
with self.current_user('admin'):
response = self.run_test_ad_hoc_command()
# Retrieve the relaunch URL, should indicate no passwords are needed
# and it can be relaunched. Relaunch and fetch the new command.
url = reverse('api:ad_hoc_command_relaunch', args=(response['id'],))
self.assertEqual(url, response['related']['relaunch'])
with self.current_user('admin'):
response = self.get(url, expect=200)
self.assertEqual(response['passwords_needed_to_start'], [])
response = self.post(url, {}, expect=201)
self.assertTrue(response['ad_hoc_command'])
self.get(response['url'], expect=200)
self.put(url, {}, expect=405)
self.patch(url, {}, expect=405)
self.delete(url, expect=405)
with self.current_user('normal'):
response = self.get(url, expect=200)
self.assertEqual(response['passwords_needed_to_start'], [])
response = self.post(url, {}, expect=201)
self.assertTrue(response['ad_hoc_command'])
self.get(response['url'], expect=200)
self.put(url, {}, expect=405)
self.patch(url, {}, expect=405)
self.delete(url, expect=405)
with self.current_user('other'):
self.get(url, expect=403)
self.post(url, {}, expect=403)
self.put(url, {}, expect=405)
self.patch(url, {}, expect=405)
self.delete(url, expect=405)
with self.current_user('nobody'):
self.get(url, expect=403)
self.post(url, {}, expect=403)
self.put(url, {}, expect=405)
self.patch(url, {}, expect=405)
self.delete(url, expect=405)
with self.current_user(None):
self.get(url, expect=401)
self.post(url, {}, expect=401)
self.put(url, {}, expect=401)
self.patch(url, {}, expect=401)
self.delete(url, expect=401)
# Try to relaunch ad hoc command when module has been removed from
# allowed list of modules.
try:
ad_hoc_commands = settings.AD_HOC_COMMANDS
settings.AD_HOC_COMMANDS = []
with self.current_user('admin'):
response = self.get(url, expect=200)
self.assertEqual(response['passwords_needed_to_start'], [])
response = self.post(url, {}, expect=400)
finally:
settings.AD_HOC_COMMANDS = ad_hoc_commands
# Try to relaunch after the inventory has been marked inactive.
self.inventory.delete()
with self.current_user('admin'):
response = self.get(url, expect=200)
self.assertEqual(response['passwords_needed_to_start'], [])
response = self.post(url, {}, expect=400)
# Try to relaunch with expired license.
with self.current_user('admin'):
response = self.run_test_ad_hoc_command(inventory=self.inventory2.pk)
self.create_expired_license_file()
with self.current_user('admin'):
self.post(response['related']['relaunch'], {}, expect=403)
def test_ad_hoc_command_events_list(self):
# TODO: Create test events instead of relying on playbooks execution
with self.current_user('admin'):
response = self.run_test_ad_hoc_command()
response = self.run_test_ad_hoc_command()
# Check list of ad hoc command events for a specific ad hoc command.
ad_hoc_command_id = response['id']
url = reverse('api:ad_hoc_command_ad_hoc_command_events_list', args=(ad_hoc_command_id,))
self.assertEqual(url, response['related']['events'])
with self.current_user('admin'):
response = self.get(url, expect=200)
self.assertEqual(response['count'], self.inventory.hosts.count())
for result in response['results']:
self.assertEqual(result['ad_hoc_command'], ad_hoc_command_id)
self.assertTrue(result['id'])
self.assertTrue(result['url'])
self.assertEqual(result['event'], 'runner_on_ok')
self.assertFalse(result['failed'])
self.assertTrue(result['changed'])
self.assertTrue(result['host'] in set(self.inventory.hosts.values_list('pk', flat=True)))
self.assertTrue(result['host_name'] in set(self.inventory.hosts.values_list('name', flat=True)))
self.post(url, {}, expect=403)
self.put(url, {}, expect=405)
self.patch(url, {}, expect=405)
self.delete(url, expect=405)
with self.current_user('normal'):
response = self.get(url, expect=200)
self.assertEqual(response['count'], self.inventory.hosts.count())
self.post(url, {}, expect=403)
self.put(url, {}, expect=405)
self.patch(url, {}, expect=405)
self.delete(url, expect=405)
with self.current_user('other'):
self.get(url, expect=403)
self.post(url, {}, expect=403)
self.put(url, {}, expect=405)
self.patch(url, {}, expect=405)
self.delete(url, expect=405)
with self.current_user('nobody'):
self.get(url, expect=403)
self.post(url, {}, expect=403)
self.put(url, {}, expect=405)
self.patch(url, {}, expect=405)
self.delete(url, expect=405)
with self.current_user(None):
self.get(url, expect=401)
self.post(url, {}, expect=401)
self.put(url, {}, expect=401)
self.patch(url, {}, expect=401)
self.delete(url, expect=401)
# Test top level ad hoc command events list.
url = reverse('api:ad_hoc_command_event_list')
with self.current_user('admin'):
response = self.get(url, expect=200)
self.assertEqual(response['count'], 2 * self.inventory.hosts.count())
self.post(url, {}, expect=405)
self.put(url, {}, expect=405)
self.patch(url, {}, expect=405)
self.delete(url, expect=405)
with self.current_user('normal'):
response = self.get(url, expect=200)
self.assertEqual(response['count'], 2 * self.inventory.hosts.count())
self.post(url, {}, expect=405)
self.put(url, {}, expect=405)
self.patch(url, {}, expect=405)
self.delete(url, expect=405)
with self.current_user('other'):
response = self.get(url, expect=200)
self.assertEqual(response['count'], 0)
self.post(url, {}, expect=405)
self.put(url, {}, expect=405)
self.patch(url, {}, expect=405)
self.delete(url, expect=405)
with self.current_user('nobody'):
response = self.get(url, expect=200)
self.assertEqual(response['count'], 0)
self.post(url, {}, expect=405)
self.put(url, {}, expect=405)
self.patch(url, {}, expect=405)
self.delete(url, expect=405)
with self.current_user(None):
self.get(url, expect=401)
self.post(url, {}, expect=401)
self.put(url, {}, expect=401)
self.patch(url, {}, expect=401)
self.delete(url, expect=401)
def test_ad_hoc_command_event_detail(self):
# TODO: Mock pexpect. Create test events instead of relying on playbooks execution
with self.current_user('admin'):
response = self.run_test_ad_hoc_command()
# Check ad hoc command event detail view.
ad_hoc_command_event_ids = AdHocCommandEvent.objects.values_list('pk', flat=True)
with self.current_user('admin'):
for ahce_id in ad_hoc_command_event_ids:
url = reverse('api:ad_hoc_command_event_detail', args=(ahce_id,))
response = self.get(url, expect=200)
self.assertTrue(response['ad_hoc_command'])
self.assertEqual(response['id'], ahce_id)
self.assertEqual(response['url'], url)
self.assertEqual(response['event'], 'runner_on_ok')
self.assertFalse(response['failed'])
self.assertTrue(response['changed'])
self.assertTrue(response['host'] in set(self.inventory.hosts.values_list('pk', flat=True)))
self.assertTrue(response['host_name'] in set(self.inventory.hosts.values_list('name', flat=True)))
self.post(url, {}, expect=405)
self.put(url, {}, expect=405)
self.patch(url, {}, expect=405)
self.delete(url, expect=405)
with self.current_user('normal'):
for ahce_id in ad_hoc_command_event_ids:
url = reverse('api:ad_hoc_command_event_detail', args=(ahce_id,))
self.get(url, expect=200)
self.post(url, {}, expect=405)
self.put(url, {}, expect=405)
self.patch(url, {}, expect=405)
self.delete(url, expect=405)
with self.current_user('other'):
for ahce_id in ad_hoc_command_event_ids:
url = reverse('api:ad_hoc_command_event_detail', args=(ahce_id,))
self.get(url, expect=403)
self.post(url, {}, expect=405)
self.put(url, {}, expect=405)
self.patch(url, {}, expect=405)
self.delete(url, expect=405)
with self.current_user('nobody'):
for ahce_id in ad_hoc_command_event_ids:
url = reverse('api:ad_hoc_command_event_detail', args=(ahce_id,))
self.get(url, expect=403)
self.post(url, {}, expect=405)
self.put(url, {}, expect=405)
self.patch(url, {}, expect=405)
self.delete(url, expect=405)
with self.current_user(None):
for ahce_id in ad_hoc_command_event_ids:
url = reverse('api:ad_hoc_command_event_detail', args=(ahce_id,))
self.get(url, expect=401)
self.post(url, {}, expect=401)
self.put(url, {}, expect=401)
self.patch(url, {}, expect=401)
self.delete(url, expect=401)
@mock.patch('awx.main.tasks.BaseTask.run_pexpect', side_effect=run_pexpect_mock)
def test_ad_hoc_command_activity_stream(self, ignore):
# TODO: Test non-enterprise license
self.create_test_license_file()
with self.current_user('admin'):
response = self.run_test_ad_hoc_command()
# Check activity stream for ad hoc command. There should only be one
# entry when it was created; other changes made while running should
# not show up.
url = reverse('api:ad_hoc_command_activity_stream_list', args=(response['id'],))
self.assertEqual(url, response['related']['activity_stream'])
with self.current_user('admin'):
response = self.get(url, expect=200)
self.assertEqual(response['count'], 1)
result = response['results'][0]
self.assertTrue(result['id'])
self.assertTrue(result['url'])
self.assertEqual(result['operation'], 'create')
self.assertTrue(result['changes'])
self.assertTrue(result['timestamp'])
self.assertEqual(result['object1'], 'ad_hoc_command')
self.assertEqual(result['object2'], '')
self.post(url, {}, expect=405)
self.put(url, {}, expect=405)
self.patch(url, {}, expect=405)
self.delete(url, expect=405)
with self.current_user('normal'):
response = self.get(url, expect=200)
self.assertEqual(response['count'], 1)
self.post(url, {}, expect=405)
self.put(url, {}, expect=405)
self.patch(url, {}, expect=405)
self.delete(url, expect=405)
with self.current_user('other'):
self.get(url, expect=403)
self.post(url, {}, expect=405)
self.put(url, {}, expect=405)
self.patch(url, {}, expect=405)
self.delete(url, expect=405)
with self.current_user('nobody'):
self.get(url, expect=403)
self.post(url, {}, expect=405)
self.put(url, {}, expect=405)
self.patch(url, {}, expect=405)
self.delete(url, expect=405)
with self.current_user(None):
self.get(url, expect=401)
self.post(url, {}, expect=401)
self.put(url, {}, expect=401)
self.patch(url, {}, expect=401)
self.delete(url, expect=401)
def test_host_ad_hoc_commands_list(self):
# TODO: Figure out why this test needs pexpect
with self.current_user('admin'):
response = self.run_test_ad_hoc_command()
response = self.run_test_ad_hoc_command(limit=self.host2.name)
# Test the ad hoc commands list for a host. Should only return the ad
# hoc command(s) run against that host. Posting should start a new ad
# hoc command and always set the inventory and limit based on URL.
url = reverse('api:host_ad_hoc_commands_list', args=(self.host.pk,))
with self.current_user('admin'):
response = self.get(url, expect=200)
self.assertEqual(response['count'], 1)
response = self.run_test_ad_hoc_command(url=url, inventory=None, expect=201)
self.assertEqual(response['inventory'], self.inventory.pk)
self.assertEqual(response['limit'], self.host.name)
response = self.run_test_ad_hoc_command(url=url, inventory=self.inventory2.pk, limit=self.host2.name, expect=201)
self.assertEqual(response['inventory'], self.inventory.pk)
self.assertEqual(response['limit'], self.host.name)
self.put(url, {}, expect=405)
self.patch(url, {}, expect=405)
self.delete(url, expect=405)
with self.current_user('normal'):
response = self.get(url, expect=200)
self.assertEqual(response['count'], 3)
response = self.run_test_ad_hoc_command(url=url, inventory=None, expect=201)
self.assertEqual(response['inventory'], self.inventory.pk)
self.assertEqual(response['limit'], self.host.name)
self.put(url, {}, expect=405)
self.patch(url, {}, expect=405)
self.delete(url, expect=405)
with self.current_user('other'):
self.get(url, expect=403)
self.post(url, {}, expect=403)
self.put(url, {}, expect=405)
self.patch(url, {}, expect=405)
self.delete(url, expect=405)
with self.current_user('nobody'):
self.get(url, expect=403)
self.post(url, {}, expect=403)
self.put(url, {}, expect=405)
self.patch(url, {}, expect=405)
self.delete(url, expect=405)
with self.current_user(None):
self.get(url, expect=401)
self.post(url, {}, expect=401)
self.put(url, {}, expect=401)
self.patch(url, {}, expect=401)
self.delete(url, expect=401)
# Try to run with expired license.
self.create_expired_license_file()
with self.current_user('admin'):
self.run_test_ad_hoc_command(url=url, expect=403)
with self.current_user('normal'):
self.run_test_ad_hoc_command(url=url, expect=403)
def test_group_ad_hoc_commands_list(self):
# TODO: Figure out why this test needs pexpect
with self.current_user('admin'):
response = self.run_test_ad_hoc_command() # self.host + self.host2
response = self.run_test_ad_hoc_command(limit=self.group.name) # self.host
response = self.run_test_ad_hoc_command(limit=self.host2.name) # self.host2
# Test the ad hoc commands list for a group. Should return the ad
# hoc command(s) run against any hosts in that group. Posting should
# start a new ad hoc command and always set the inventory and limit
# based on URL.
url = reverse('api:group_ad_hoc_commands_list', args=(self.group.pk,)) # only self.host
url2 = reverse('api:group_ad_hoc_commands_list', args=(self.group2.pk,)) # self.host + self.host2
with self.current_user('admin'):
response = self.get(url, expect=200)
self.assertEqual(response['count'], 2)
response = self.get(url2, expect=200)
self.assertEqual(response['count'], 3)
response = self.run_test_ad_hoc_command(url=url, inventory=None, expect=201)
self.assertEqual(response['inventory'], self.inventory.pk)
self.assertEqual(response['limit'], self.group.name)
response = self.run_test_ad_hoc_command(url=url, inventory=self.inventory2.pk, limit=self.group2.name, expect=201)
self.assertEqual(response['inventory'], self.inventory.pk)
self.assertEqual(response['limit'], self.group.name)
self.put(url, {}, expect=405)
self.patch(url, {}, expect=405)
self.delete(url, expect=405)
with self.current_user('normal'):
response = self.get(url, expect=200)
self.assertEqual(response['count'], 4)
response = self.run_test_ad_hoc_command(url=url, inventory=None, expect=201)
self.assertEqual(response['inventory'], self.inventory.pk)
self.assertEqual(response['limit'], self.group.name)
self.put(url, {}, expect=405)
self.patch(url, {}, expect=405)
self.delete(url, expect=405)
with self.current_user('other'):
self.get(url, expect=403)
self.post(url, {}, expect=403)
self.put(url, {}, expect=405)
self.patch(url, {}, expect=405)
self.delete(url, expect=405)
with self.current_user('nobody'):
self.get(url, expect=403)
self.post(url, {}, expect=403)
self.put(url, {}, expect=405)
self.patch(url, {}, expect=405)
self.delete(url, expect=405)
with self.current_user(None):
self.get(url, expect=401)
self.post(url, {}, expect=401)
self.put(url, {}, expect=401)
self.patch(url, {}, expect=401)
self.delete(url, expect=401)
# Try to run with expired license.
self.create_expired_license_file()
with self.current_user('admin'):
self.run_test_ad_hoc_command(url=url, expect=403)
with self.current_user('normal'):
self.run_test_ad_hoc_command(url=url, expect=403)
def test_host_ad_hoc_command_events_list(self):
# TODO: Mock run_pexpect. Create test events instead of relying on playbooks execution
with self.current_user('admin'):
response = self.run_test_ad_hoc_command()
# Test the ad hoc command events list for a host. Should return the
# events only for that particular host.
url = reverse('api:host_ad_hoc_command_events_list', args=(self.host.pk,))
with self.current_user('admin'):
response = self.get(url, expect=200)
self.assertEqual(response['count'], 1)
self.post(url, {}, expect=405)
self.put(url, {}, expect=405)
self.patch(url, {}, expect=405)
self.delete(url, expect=405)
with self.current_user('normal'):
response = self.get(url, expect=200)
self.assertEqual(response['count'], 1)
self.post(url, {}, expect=405)
self.put(url, {}, expect=405)
self.patch(url, {}, expect=405)
self.delete(url, expect=405)
with self.current_user('other'):
self.get(url, expect=403)
self.post(url, {}, expect=405)
self.put(url, {}, expect=405)
self.patch(url, {}, expect=405)
self.delete(url, expect=405)
with self.current_user('nobody'):
self.get(url, expect=403)
self.post(url, {}, expect=405)
self.put(url, {}, expect=405)
self.patch(url, {}, expect=405)
self.delete(url, expect=405)
with self.current_user(None):
self.get(url, expect=401)
self.post(url, {}, expect=401)
self.put(url, {}, expect=401)
self.patch(url, {}, expect=401)
self.delete(url, expect=401)

View File

@ -1,48 +0,0 @@
# Copyright (c) 2015 Ansible, Inc.
# All Rights Reserved.
import os
import unittest2 as unittest
from django.conf import settings
from django.test import LiveServerTestCase
from django.test.utils import override_settings
from awx.main.tests.job_base import BaseJobTestMixin
@unittest.skipIf(os.environ.get('SKIP_SLOW_TESTS', False), 'Skipping slow test')
@override_settings(CELERY_ALWAYS_EAGER=True,
CELERY_EAGER_PROPAGATES_EXCEPTIONS=True,
ANSIBLE_TRANSPORT='local')
class JobTasksTests(BaseJobTestMixin, LiveServerTestCase):
"""A set of tests to ensure that the job_tasks endpoint, available at
`/api/v1/jobs/{id}/job_tasks/`, works as expected.
"""
def setUp(self):
super(JobTasksTests, self).setUp()
settings.INTERNAL_API_URL = self.live_server_url
def test_tasks_endpoint(self):
"""Establish that the `job_tasks` endpoint shows what we expect,
which is a rollup of information about each of the corresponding
job events.
"""
# Create a job
job = self.make_job(self.jt_ops_east_run, self.user_sue, 'new')
job.signal_start()
# Get the initial job event.
event = job.job_events.get(event='playbook_on_play_start')
# Actually make the request for the job tasks.
with self.current_user(self.user_sue):
url = '/api/v1/jobs/%d/job_tasks/?event_id=%d' % (job.id, event.id)
response = self.get(url)
# Test to make sure we got back what we expected.
result = response['results'][0]
self.assertEqual(result['host_count'], 7)
self.assertEqual(result['changed_count'], 7)
self.assertFalse(result['failed'])
self.assertTrue(result['changed'])

View File

@ -1,87 +0,0 @@
# Copyright (c) 2015 Ansible, Inc.
# All Rights Reserved
# Python
import StringIO
import sys
import json
# Django
from django.core.management import call_command
# AWX
from awx.main.models import * # noqa
from awx.main.tests.base import BaseTestMixin
class BaseCommandMixin(BaseTestMixin):
def create_test_inventories(self):
self.setup_users()
self.organizations = self.make_organizations(self.super_django_user, 2)
self.projects = self.make_projects(self.normal_django_user, 2)
self.organizations[0].projects.add(self.projects[1])
self.organizations[1].projects.add(self.projects[0])
self.inventories = []
self.hosts = []
self.groups = []
for n, organization in enumerate(self.organizations):
inventory = Inventory.objects.create(name='inventory-%d' % n,
description='description for inventory %d' % n,
organization=organization,
variables=json.dumps({'n': n}) if n else '')
self.inventories.append(inventory)
hosts = []
for x in xrange(10):
if n > 0:
variables = json.dumps({'ho': 'hum-%d' % x})
else:
variables = ''
host = inventory.hosts.create(name='host-%02d-%02d.example.com' % (n, x),
inventory=inventory,
variables=variables)
hosts.append(host)
self.hosts.extend(hosts)
groups = []
for x in xrange(5):
if n > 0:
variables = json.dumps({'gee': 'whiz-%d' % x})
else:
variables = ''
group = inventory.groups.create(name='group-%d' % x,
inventory=inventory,
variables=variables)
groups.append(group)
group.hosts.add(hosts[x])
group.hosts.add(hosts[x + 5])
if n > 0 and x == 4:
group.parents.add(groups[3])
self.groups.extend(groups)
def run_command(self, name, *args, **options):
'''
Run a management command and capture its stdout/stderr along with any
exceptions.
'''
command_runner = options.pop('command_runner', call_command)
stdin_fileobj = options.pop('stdin_fileobj', None)
options.setdefault('verbosity', 1)
options.setdefault('interactive', False)
original_stdin = sys.stdin
original_stdout = sys.stdout
original_stderr = sys.stderr
if stdin_fileobj:
sys.stdin = stdin_fileobj
sys.stdout = StringIO.StringIO()
sys.stderr = StringIO.StringIO()
result = None
try:
result = command_runner(name, *args, **options)
except Exception as e:
result = e
finally:
captured_stdout = sys.stdout.getvalue()
captured_stderr = sys.stderr.getvalue()
sys.stdin = original_stdin
sys.stdout = original_stdout
sys.stderr = original_stderr
return result, captured_stdout, captured_stderr

View File

@ -1,376 +0,0 @@
# Copyright (c) 2015 Ansible, Inc.
# All Rights Reserved.
# Python
import json
import os
import shutil
import StringIO
import sys
import time
import unittest2 as unittest
# Django
from django.core.management import call_command
from django.utils.timezone import now
from django.test.utils import override_settings
# AWX
from awx.main.models import * # noqa
from awx.main.tests.base import BaseTest, BaseLiveServerTest
__all__ = ['CreateDefaultOrgTest', 'DumpDataTest', 'CleanupDeletedTest',
'CleanupJobsTest', 'CleanupActivityStreamTest',
'InventoryImportTest']
TEST_PLAYBOOK = '''- hosts: test-group
gather_facts: False
tasks:
- name: should pass
command: test 1 = 1
- name: should also pass
command: test 2 = 2
'''
class BaseCommandMixin(object):
'''
Base class for tests that run management commands.
'''
def create_test_inventories(self):
self.setup_users()
self.organizations = self.make_organizations(self.super_django_user, 2)
self.projects = self.make_projects(self.normal_django_user, 2)
self.organizations[0].projects.add(self.projects[1])
self.organizations[1].projects.add(self.projects[0])
self.inventories = []
self.hosts = []
self.groups = []
for n, organization in enumerate(self.organizations):
inventory = Inventory.objects.create(name='inventory-%d' % n,
description='description for inventory %d' % n,
organization=organization,
variables=json.dumps({'n': n}) if n else '')
self.inventories.append(inventory)
hosts = []
for x in xrange(10):
if n > 0:
variables = json.dumps({'ho': 'hum-%d' % x})
else:
variables = ''
host = inventory.hosts.create(name='host-%02d-%02d.example.com' % (n, x),
inventory=inventory,
variables=variables)
hosts.append(host)
self.hosts.extend(hosts)
groups = []
for x in xrange(5):
if n > 0:
variables = json.dumps({'gee': 'whiz-%d' % x})
else:
variables = ''
group = inventory.groups.create(name='group-%d' % x,
inventory=inventory,
variables=variables)
groups.append(group)
group.hosts.add(hosts[x])
group.hosts.add(hosts[x + 5])
if n > 0 and x == 4:
group.parents.add(groups[3])
self.groups.extend(groups)
def run_command(self, name, *args, **options):
'''
Run a management command and capture its stdout/stderr along with any
exceptions.
'''
command_runner = options.pop('command_runner', call_command)
stdin_fileobj = options.pop('stdin_fileobj', None)
options.setdefault('verbosity', 1)
options.setdefault('interactive', False)
original_stdin = sys.stdin
original_stdout = sys.stdout
original_stderr = sys.stderr
if stdin_fileobj:
sys.stdin = stdin_fileobj
sys.stdout = StringIO.StringIO()
sys.stderr = StringIO.StringIO()
result = None
try:
result = command_runner(name, *args, **options)
except Exception as e:
result = e
finally:
captured_stdout = sys.stdout.getvalue()
captured_stderr = sys.stderr.getvalue()
sys.stdin = original_stdin
sys.stdout = original_stdout
sys.stderr = original_stderr
return result, captured_stdout, captured_stderr
class CreateDefaultOrgTest(BaseCommandMixin, BaseTest):
'''
Test cases for create_default_org management command.
'''
def setUp(self):
super(CreateDefaultOrgTest, self).setUp()
self.setup_instances()
def test_create_default_org(self):
self.setup_users()
self.assertEqual(Organization.objects.count(), 0)
result, stdout, stderr = self.run_command('create_preload_data')
self.assertEqual(result, None)
self.assertTrue('Default organization added' in stdout)
self.assertEqual(Organization.objects.count(), 1)
org = Organization.objects.all()[0]
self.assertEqual(org.created_by, self.super_django_user)
self.assertEqual(org.modified_by, self.super_django_user)
result, stdout, stderr = self.run_command('create_preload_data')
self.assertEqual(result, None)
self.assertFalse('Default organization added' in stdout)
self.assertEqual(Organization.objects.count(), 1)
class DumpDataTest(BaseCommandMixin, BaseTest):
'''
Test cases for dumpdata management command.
'''
def setUp(self):
super(DumpDataTest, self).setUp()
self.create_test_inventories()
def test_dumpdata(self):
result, stdout, stderr = self.run_command('dumpdata')
self.assertEqual(result, None)
json.loads(stdout)
@override_settings(CELERY_ALWAYS_EAGER=True,
CELERY_EAGER_PROPAGATES_EXCEPTIONS=True,
ANSIBLE_TRANSPORT='local')
class CleanupJobsTest(BaseCommandMixin, BaseLiveServerTest):
'''
Test cases for cleanup_jobs management command.
'''
def setUp(self):
super(CleanupJobsTest, self).setUp()
self.test_project_path = None
self.setup_instances()
self.setup_users()
self.organization = self.make_organizations(self.super_django_user, 1)[0]
self.inventory = Inventory.objects.create(name='test-inventory',
description='description for test-inventory',
organization=self.organization)
self.host = self.inventory.hosts.create(name='host.example.com',
inventory=self.inventory)
self.group = self.inventory.groups.create(name='test-group',
inventory=self.inventory)
self.group.hosts.add(self.host)
self.project = None
self.credential = None
self.start_queue()
def tearDown(self):
super(CleanupJobsTest, self).tearDown()
self.terminate_queue()
if self.test_project_path:
shutil.rmtree(self.test_project_path, True)
def create_test_credential(self, **kwargs):
self.credential = self.make_credential(**kwargs)
return self.credential
def create_test_project(self, playbook_content):
self.project = self.make_projects(self.normal_django_user, 1, playbook_content)[0]
self.organization.projects.add(self.project)
def create_test_job_template(self, **kwargs):
opts = {
'name': 'test-job-template %s' % str(now()),
'inventory': self.inventory,
'project': self.project,
'credential': self.credential,
'job_type': 'run',
}
try:
opts['playbook'] = self.project.playbooks[0]
except (AttributeError, IndexError):
pass
opts.update(kwargs)
self.job_template = JobTemplate.objects.create(**opts)
return self.job_template
def create_test_job(self, **kwargs):
job_template = kwargs.pop('job_template', None)
if job_template:
self.job = job_template.create_job(**kwargs)
else:
opts = {
'name': 'test-job %s' % str(now()),
'inventory': self.inventory,
'project': self.project,
'credential': self.credential,
'job_type': 'run',
}
try:
opts['playbook'] = self.project.playbooks[0]
except (AttributeError, IndexError):
pass
opts.update(kwargs)
self.job = Job.objects.create(**opts)
return self.job
def create_test_ad_hoc_command(self, **kwargs):
opts = {
'inventory': self.inventory,
'credential': self.credential,
'module_name': 'command',
'module_args': 'uptime',
}
opts.update(kwargs)
self.ad_hoc_command = AdHocCommand.objects.create(**opts)
return self.ad_hoc_command
def test_cleanup_jobs(self):
# Test with no jobs to be cleaned up.
jobs_before = Job.objects.all().count()
self.assertFalse(jobs_before)
ad_hoc_commands_before = AdHocCommand.objects.all().count()
self.assertFalse(ad_hoc_commands_before)
result, stdout, stderr = self.run_command('cleanup_jobs')
self.assertEqual(result, None)
jobs_after = Job.objects.all().count()
self.assertEqual(jobs_before, jobs_after)
ad_hoc_commands_after = AdHocCommand.objects.all().count()
self.assertEqual(ad_hoc_commands_before, ad_hoc_commands_after)
# Create and run job.
self.create_test_credential()
self.create_test_project(TEST_PLAYBOOK)
job_template = self.create_test_job_template()
job = self.create_test_job(job_template=job_template)
self.assertEqual(job.status, 'new')
self.assertFalse(job.passwords_needed_to_start)
self.assertTrue(job.signal_start())
job = Job.objects.get(pk=job.pk)
self.assertEqual(job.status, 'successful')
# Create and run ad hoc command.
ad_hoc_command = self.create_test_ad_hoc_command()
self.assertEqual(ad_hoc_command.status, 'new')
self.assertFalse(ad_hoc_command.passwords_needed_to_start)
self.assertTrue(ad_hoc_command.signal_start())
ad_hoc_command = AdHocCommand.objects.get(pk=ad_hoc_command.pk)
self.assertEqual(ad_hoc_command.status, 'successful')
# With days=1, no jobs will be deleted.
jobs_before = Job.objects.all().count()
self.assertTrue(jobs_before)
ad_hoc_commands_before = AdHocCommand.objects.all().count()
self.assertTrue(ad_hoc_commands_before)
result, stdout, stderr = self.run_command('cleanup_jobs', days=1)
self.assertEqual(result, None)
jobs_after = Job.objects.all().count()
self.assertEqual(jobs_before, jobs_after)
ad_hoc_commands_after = AdHocCommand.objects.all().count()
self.assertEqual(ad_hoc_commands_before, ad_hoc_commands_after)
# With days=0 and dry_run=True, no jobs will be deleted.
jobs_before = Job.objects.all().count()
self.assertTrue(jobs_before)
ad_hoc_commands_before = AdHocCommand.objects.all().count()
self.assertTrue(ad_hoc_commands_before)
result, stdout, stderr = self.run_command('cleanup_jobs', days=0,
dry_run=True)
self.assertEqual(result, None)
jobs_after = Job.objects.all().count()
self.assertEqual(jobs_before, jobs_after)
ad_hoc_commands_after = AdHocCommand.objects.all().count()
self.assertEqual(ad_hoc_commands_before, ad_hoc_commands_after)
# With days=0, our job and ad hoc command will be deleted.
jobs_before = Job.objects.all().count()
self.assertTrue(jobs_before)
ad_hoc_commands_before = AdHocCommand.objects.all().count()
self.assertTrue(ad_hoc_commands_before)
result, stdout, stderr = self.run_command('cleanup_jobs', days=0)
self.assertEqual(result, None)
jobs_after = Job.objects.all().count()
self.assertNotEqual(jobs_before, jobs_after)
self.assertFalse(jobs_after)
ad_hoc_commands_after = AdHocCommand.objects.all().count()
self.assertNotEqual(ad_hoc_commands_before, ad_hoc_commands_after)
self.assertFalse(ad_hoc_commands_after)
@unittest.skipIf(os.environ.get('SKIP_SLOW_TESTS', False), 'Skipping slow test')
class CleanupActivityStreamTest(BaseCommandMixin, BaseTest):
'''
Test cases for cleanup_activitystream management command.
'''
def setUp(self):
super(CleanupActivityStreamTest, self).setUp()
self.start_rabbit()
self.create_test_inventories()
def tearDown(self):
self.stop_rabbit()
super(CleanupActivityStreamTest, self).tearDown()
def test_cleanup(self):
# Should already have entries due to test case setup. With no
# parameters, "days" defaults to 30, which won't cleanup anything.
count_before = ActivityStream.objects.count()
self.assertTrue(count_before)
result, stdout, stderr = self.run_command('cleanup_activitystream')
self.assertEqual(result, None)
count_after = ActivityStream.objects.count()
self.assertEqual(count_before, count_after)
# With days=1, nothing should be changed.
result, stdout, stderr = self.run_command('cleanup_activitystream', days=1)
self.assertEqual(result, None)
count_after = ActivityStream.objects.count()
self.assertEqual(count_before, count_after)
# With days=0 and dry_run=True, nothing should be changed.
result, stdout, stderr = self.run_command('cleanup_activitystream', days=0, dry_run=True)
self.assertEqual(result, None)
count_after = ActivityStream.objects.count()
self.assertEqual(count_before, count_after)
# With days=0, everything should be cleaned up.
result, stdout, stderr = self.run_command('cleanup_activitystream', days=0)
self.assertEqual(result, None)
count_after = ActivityStream.objects.count()
self.assertNotEqual(count_before, count_after)
self.assertFalse(count_after)
# Modify hosts to create 1000 activity stream entries.
t = time.time()
for x in xrange(0, 1000 / Host.objects.count()):
for host in Host.objects.all():
host.name = u'%s-update-%d' % (host.name, x)
host.save()
create_elapsed = time.time() - t
# Time how long it takes to cleanup activity stream, should be no more
# than 1/4 the time taken to create the entries.
count_before = ActivityStream.objects.count()
self.assertTrue(count_before)
t = time.time()
result, stdout, stderr = self.run_command('cleanup_activitystream', days=0)
cleanup_elapsed = time.time() - t
self.assertEqual(result, None)
count_after = ActivityStream.objects.count()
self.assertNotEqual(count_before, count_after)
self.assertFalse(count_after)
self.assertTrue(cleanup_elapsed < (create_elapsed / 4),
'create took %0.3fs, cleanup took %0.3fs, expected < %0.3fs' % (create_elapsed, cleanup_elapsed, create_elapsed / 4))

File diff suppressed because it is too large Load Diff

View File

@ -1,232 +0,0 @@
# Copyright (c) 2015 Ansible, Inc.
# All Rights Reserved
# Python
from __future__ import absolute_import
import os
import unittest2 as unittest
# Django
import django
from django.core.urlresolvers import reverse
# AWX
from awx.main.models import * # noqa
from awx.main.tests.job_base import BaseJobTestMixin
import yaml
__all__ = ['JobTemplateLaunchTest', 'JobTemplateLaunchPasswordsTest']
@unittest.skipIf(os.environ.get('SKIP_SLOW_TESTS', False), 'Skipping slow test')
class JobTemplateLaunchTest(BaseJobTestMixin, django.test.TransactionTestCase):
def setUp(self):
super(JobTemplateLaunchTest, self).setUp()
self.url = reverse('api:job_template_list')
self.data = dict(
name = 'launched job template',
job_type = PERM_INVENTORY_DEPLOY,
inventory = self.inv_eng.pk,
project = self.proj_dev.pk,
credential = self.cred_sue.pk,
playbook = self.proj_dev.playbooks[0],
ask_variables_on_launch = True,
ask_credential_on_launch = True,
)
self.data_no_cred = dict(
name = 'launched job template no credential',
job_type = PERM_INVENTORY_DEPLOY,
inventory = self.inv_eng.pk,
project = self.proj_dev.pk,
playbook = self.proj_dev.playbooks[0],
ask_credential_on_launch = True,
ask_variables_on_launch = True,
)
self.data_cred_ask = dict(self.data)
self.data_cred_ask['name'] = 'launched job templated with ask passwords'
self.data_cred_ask['credential'] = self.cred_sue_ask.pk
with self.current_user(self.user_sue):
response = self.post(self.url, self.data, expect=201)
self.launch_url = reverse('api:job_template_launch',
args=(response['id'],))
def test_launch_job_template(self):
with self.current_user(self.user_sue):
self.data['name'] = 'something different'
response = self.post(self.url, self.data, expect=201)
detail_url = reverse('api:job_template_detail',
args=(response['id'],))
self.assertEquals(response['url'], detail_url)
def test_no_cred_update_template(self):
# You can still post the job template without a credential, just can't launch it without one
with self.current_user(self.user_sue):
response = self.post(self.url, self.data_no_cred, expect=201)
detail_url = reverse('api:job_template_detail',
args=(response['id'],))
self.assertEquals(response['url'], detail_url)
def test_invalid_auth_unauthorized(self):
# Invalid auth can't trigger the launch endpoint
self.check_invalid_auth(self.launch_url, {}, methods=('post',))
def test_credential_implicit(self):
# Implicit, attached credentials
with self.current_user(self.user_sue):
response = self.post(self.launch_url, {}, expect=201)
j = Job.objects.get(pk=response['job'])
self.assertTrue(j.status == 'new')
def test_launch_extra_vars_json(self):
# Sending extra_vars as a JSON string, implicit credentials
with self.current_user(self.user_sue):
data = dict(extra_vars = '{\"a\":3}')
response = self.post(self.launch_url, data, expect=201)
j = Job.objects.get(pk=response['job'])
ev_dict = yaml.load(j.extra_vars)
self.assertIn('a', ev_dict)
if 'a' in ev_dict:
self.assertEqual(ev_dict['a'], 3)
def test_launch_extra_vars_yaml(self):
# Sending extra_vars as a JSON string, implicit credentials
with self.current_user(self.user_sue):
data = dict(extra_vars = 'a: 3')
response = self.post(self.launch_url, data, expect=201)
j = Job.objects.get(pk=response['job'])
ev_dict = yaml.load(j.extra_vars)
self.assertIn('a', ev_dict)
if 'a' in ev_dict:
self.assertEqual(ev_dict['a'], 3)
def test_credential_explicit(self):
# Explicit, credential
with self.current_user(self.user_sue):
self.cred_sue.delete()
response = self.post(self.launch_url, {'credential': self.cred_doug.pk}, expect=201)
j = Job.objects.get(pk=response['job'])
self.assertEqual(j.status, 'new')
self.assertEqual(j.credential.pk, self.cred_doug.pk)
def test_credential_explicit_via_credential_id(self):
# Explicit, credential
with self.current_user(self.user_sue):
self.cred_sue.delete()
response = self.post(self.launch_url, {'credential_id': self.cred_doug.pk}, expect=201)
j = Job.objects.get(pk=response['job'])
self.assertEqual(j.status, 'new')
self.assertEqual(j.credential.pk, self.cred_doug.pk)
def test_credential_override(self):
# Explicit, credential
with self.current_user(self.user_sue):
response = self.post(self.launch_url, {'credential': self.cred_doug.pk}, expect=201)
j = Job.objects.get(pk=response['job'])
self.assertEqual(j.status, 'new')
self.assertEqual(j.credential.pk, self.cred_doug.pk)
def test_credential_override_via_credential_id(self):
# Explicit, credential
with self.current_user(self.user_sue):
response = self.post(self.launch_url, {'credential_id': self.cred_doug.pk}, expect=201)
j = Job.objects.get(pk=response['job'])
self.assertEqual(j.status, 'new')
self.assertEqual(j.credential.pk, self.cred_doug.pk)
def test_bad_credential_launch_fail(self):
# Can't launch a job template without a credential defined (or if we
# pass an invalid/inactive credential value).
with self.current_user(self.user_sue):
self.cred_sue.delete()
self.post(self.launch_url, {}, expect=400)
self.post(self.launch_url, {'credential': 0}, expect=400)
self.post(self.launch_url, {'credential_id': 0}, expect=400)
self.post(self.launch_url, {'credential': 'one'}, expect=400)
self.post(self.launch_url, {'credential_id': 'one'}, expect=400)
cred_doug_pk = self.cred_doug.pk
self.cred_doug.delete()
self.post(self.launch_url, {'credential': cred_doug_pk}, expect=400)
self.post(self.launch_url, {'credential_id': cred_doug_pk}, expect=400)
def test_explicit_unowned_cred(self):
# Explicitly specify a credential that we don't have access to
with self.current_user(self.user_juan):
launch_url = reverse('api:job_template_launch',
args=(self.jt_eng_run.pk,))
self.post(launch_url, {'credential_id': self.cred_sue.pk}, expect=403)
def test_no_project_fail(self):
# Job Templates without projects cannot be launched
with self.current_user(self.user_sue):
self.data['name'] = "missing proj"
response = self.post(self.url, self.data, expect=201)
jt = JobTemplate.objects.get(pk=response['id'])
jt.project = None
jt.save()
launch_url2 = reverse('api:job_template_launch',
args=(response['id'],))
self.post(launch_url2, {}, expect=400)
def test_no_inventory_fail(self):
# Job Templates without inventory cannot be launched
with self.current_user(self.user_sue):
self.data['name'] = "missing inv"
response = self.post(self.url, self.data, expect=201)
jt = JobTemplate.objects.get(pk=response['id'])
jt.inventory = None
jt.save()
launch_url3 = reverse('api:job_template_launch',
args=(response['id'],))
self.post(launch_url3, {}, expect=400)
def test_deleted_credential_fail(self):
# Job Templates with deleted credentials cannot be launched.
self.cred_sue.delete()
with self.current_user(self.user_sue):
self.post(self.launch_url, {}, expect=400)
@unittest.skipIf(os.environ.get('SKIP_SLOW_TESTS', False), 'Skipping slow test')
class JobTemplateLaunchPasswordsTest(BaseJobTestMixin, django.test.TransactionTestCase):
def setUp(self):
super(JobTemplateLaunchPasswordsTest, self).setUp()
self.url = reverse('api:job_template_list')
self.data = dict(
name = 'launched job template',
job_type = PERM_INVENTORY_DEPLOY,
inventory = self.inv_eng.pk,
project = self.proj_dev.pk,
credential = self.cred_sue_ask.pk,
playbook = self.proj_dev.playbooks[0],
ask_credential_on_launch = True,
)
with self.current_user(self.user_sue):
response = self.post(self.url, self.data, expect=201)
self.launch_url = reverse('api:job_template_launch',
args=(response['id'],))
# should return explicit credentials required passwords
def test_explicit_cred_with_ask_passwords_fail(self):
passwords_required = ['ssh_password', 'become_password', 'ssh_key_unlock']
# Job Templates with deleted credentials cannot be launched.
with self.current_user(self.user_sue):
self.cred_sue_ask.delete()
response = self.post(self.launch_url, {'credential_id': self.cred_sue_ask_many.pk}, expect=400)
for p in passwords_required:
self.assertIn(p, response['passwords_needed_to_start'])
self.assertEqual(len(passwords_required), len(response['passwords_needed_to_start']))
def test_explicit_cred_with_ask_password(self):
with self.current_user(self.user_sue):
response = self.post(self.launch_url, {'ssh_password': 'whatever'}, expect=201)
j = Job.objects.get(pk=response['job'])
self.assertEqual(j.status, 'new')
def test_explicit_cred_with_ask_password_empty_string_fail(self):
with self.current_user(self.user_sue):
response = self.post(self.launch_url, {'ssh_password': ''}, expect=400)
self.assertIn('ssh_password', response['passwords_needed_to_start'])

View File

@ -1,79 +0,0 @@
# Copyright (c) 2015 Ansible, Inc.
# All Rights Reserved
# Python
from __future__ import absolute_import
import json
import os
import unittest2 as unittest
# Django
from django.core.urlresolvers import reverse
# AWX
from awx.main.models import * # noqa
from awx.main.tests.base import BaseLiveServerTest
from awx.main.tests.job_base import BaseJobTestMixin
__all__ = ['JobRelaunchTest',]
@unittest.skipIf(os.environ.get('SKIP_SLOW_TESTS', False), 'Skipping slow test')
class JobRelaunchTest(BaseJobTestMixin, BaseLiveServerTest):
def test_job_relaunch(self):
job = self.make_job(self.jt_ops_east_run, self.user_sue, 'success')
url = reverse('api:job_relaunch', args=(job.pk,))
with self.current_user(self.user_sue):
response = self.post(url, {}, expect=201)
j = Job.objects.get(pk=response['job'])
self.assertTrue(j.status == 'successful')
self.assertEqual(j.launch_type, 'relaunch')
# Test with a job that prompts for SSH and sudo passwords.
job = self.make_job(self.jt_sup_run, self.user_sue, 'success')
url = reverse('api:job_start', args=(job.pk,))
with self.current_user(self.user_sue):
response = self.get(url)
self.assertEqual(set(response['passwords_needed_to_start']),
set(['ssh_password', 'become_password']))
data = dict()
response = self.post(url, data, expect=400)
data['ssh_password'] = 'sshpass'
response = self.post(url, data, expect=400)
data2 = dict(become_password='sudopass')
response = self.post(url, data2, expect=400)
data.update(data2)
response = self.post(url, data, expect=202)
job = Job.objects.get(pk=job.pk)
# Create jt with no extra_vars
# Launch j1 with runtime extra_vars
# Assign extra_vars to jt backing job
# Relaunch j1
# j2 should not contain jt extra_vars
def test_relaunch_job_does_not_inherit_jt_extra_vars(self):
jt_extra_vars = {
"hello": "world"
}
j_extra_vars = {
"goodbye": "cruel universe"
}
job = self.make_job(self.jt_ops_east_run, self.user_sue, 'success', extra_vars=j_extra_vars)
url = reverse('api:job_relaunch', args=(job.pk,))
with self.current_user(self.user_sue):
response = self.post(url, {}, expect=201)
j = Job.objects.get(pk=response['job'])
self.assertTrue(j.status == 'successful')
self.jt_ops_east_run.extra_vars = jt_extra_vars
self.jt_ops_east_run.save()
response = self.post(url, {}, expect=201)
j = Job.objects.get(pk=response['job'])
self.assertTrue(j.status == 'successful')
resp_extra_vars = json.loads(response['extra_vars'])
self.assertNotIn("hello", resp_extra_vars)
self.assertEqual(resp_extra_vars, j_extra_vars)

File diff suppressed because it is too large Load Diff

View File

@ -1,248 +0,0 @@
# Copyright (c) 2015 Ansible, Inc.
# All Rights Reserved
# Python
from __future__ import absolute_import
import os
import unittest2 as unittest
# Django
from django.core.urlresolvers import reverse
from django.conf import settings
# AWX
from awx.main.models import * # noqa
from awx.main.tests.base import BaseLiveServerTest
from awx.main.tests.job_base import BaseJobTestMixin
__all__ = ['JobStartCancelTest',]
@unittest.skipIf(os.environ.get('SKIP_SLOW_TESTS', False), 'Skipping slow test')
class JobStartCancelTest(BaseJobTestMixin, BaseLiveServerTest):
def test_job_start(self):
#job = self.job_ops_east_run
job = self.make_job(self.jt_ops_east_run, self.user_sue, 'success')
url = reverse('api:job_start', args=(job.pk,))
# Test with no auth and with invalid login.
self.check_invalid_auth(url)
self.check_invalid_auth(url, methods=('post',))
# Sue can start a job (when passwords are already saved) as long as the
# status is new. Reverse list so "new" will be last.
for status in reversed([x[0] for x in Job.STATUS_CHOICES]):
if status == 'waiting':
continue
job.status = status
job.save()
with self.current_user(self.user_sue):
response = self.get(url)
if status == 'new':
self.assertTrue(response['can_start'])
self.assertFalse(response['passwords_needed_to_start'])
# response = self.post(url, {}, expect=202)
# job = Job.objects.get(pk=job.pk)
# self.assertEqual(job.status, 'successful',
# job.result_stdout)
else:
self.assertFalse(response['can_start'])
response = self.post(url, {}, expect=405)
# Test with a job that prompts for SSH and sudo become passwords.
#job = self.job_sup_run
job = self.make_job(self.jt_sup_run, self.user_sue, 'new')
url = reverse('api:job_start', args=(job.pk,))
with self.current_user(self.user_sue):
response = self.get(url)
self.assertTrue(response['can_start'])
self.assertEqual(set(response['passwords_needed_to_start']),
set(['ssh_password', 'become_password']))
data = dict()
response = self.post(url, data, expect=400)
data['ssh_password'] = 'sshpass'
response = self.post(url, data, expect=400)
data2 = dict(become_password='sudopass')
response = self.post(url, data2, expect=400)
data.update(data2)
response = self.post(url, data, expect=202)
job = Job.objects.get(pk=job.pk)
# FIXME: Test run gets the following error in this case:
# fatal: [hostname] => sudo output closed while waiting for password prompt:
#self.assertEqual(job.status, 'successful')
# Test with a job that prompts for SSH unlock key, given the wrong key.
#job = self.jt_ops_west_run.create_job(
# credential=self.cred_greg,
# created_by=self.user_sue,
#)
job = self.make_job(self.jt_ops_west_run, self.user_sue, 'new')
job.credential = self.cred_greg
job.save()
url = reverse('api:job_start', args=(job.pk,))
with self.current_user(self.user_sue):
response = self.get(url)
self.assertTrue(response['can_start'])
self.assertEqual(set(response['passwords_needed_to_start']),
set(['ssh_key_unlock']))
data = dict()
response = self.post(url, data, expect=400)
# The job should start but fail.
data['ssh_key_unlock'] = 'sshunlock'
response = self.post(url, data, expect=202)
# job = Job.objects.get(pk=job.pk)
# self.assertEqual(job.status, 'failed')
# Test with a job that prompts for SSH unlock key, given the right key.
from awx.main.tests.data.ssh import TEST_SSH_KEY_DATA_UNLOCK
# job = self.jt_ops_west_run.create_job(
# credential=self.cred_greg,
# created_by=self.user_sue,
# )
job = self.make_job(self.jt_ops_west_run, self.user_sue, 'new')
job.credential = self.cred_greg
job.save()
url = reverse('api:job_start', args=(job.pk,))
with self.current_user(self.user_sue):
response = self.get(url)
self.assertTrue(response['can_start'])
self.assertEqual(set(response['passwords_needed_to_start']),
set(['ssh_key_unlock']))
data = dict()
response = self.post(url, data, expect=400)
data['ssh_key_unlock'] = TEST_SSH_KEY_DATA_UNLOCK
response = self.post(url, data, expect=202)
# job = Job.objects.get(pk=job.pk)
# self.assertEqual(job.status, 'successful')
# FIXME: Test with other users, test when passwords are required.
def test_job_cancel(self):
#job = self.job_ops_east_run
job = self.make_job(self.jt_ops_east_run, self.user_sue, 'new')
url = reverse('api:job_cancel', args=(job.pk,))
# Test with no auth and with invalid login.
self.check_invalid_auth(url)
self.check_invalid_auth(url, methods=('post',))
# sue can cancel the job, but only when it is pending or running.
for status in [x[0] for x in Job.STATUS_CHOICES]:
if status == 'waiting':
continue
job.status = status
job.save()
with self.current_user(self.user_sue):
response = self.get(url)
if status in ('new', 'pending', 'waiting', 'running'):
self.assertTrue(response['can_cancel'])
response = self.post(url, {}, expect=202)
else:
self.assertFalse(response['can_cancel'])
response = self.post(url, {}, expect=405)
# FIXME: Test with other users.
def test_get_job_results(self):
# Start/run a job and then access its results via the API.
#job = self.job_ops_east_run
job = self.make_job(self.jt_ops_east_run, self.user_sue, 'new')
job.signal_start()
# Check that the job detail has been updated.
url = reverse('api:job_detail', args=(job.pk,))
with self.current_user(self.user_sue):
response = self.get(url)
self.assertEqual(response['status'], 'successful',
response['result_traceback'])
self.assertTrue(response['result_stdout'])
# Test job events for completed job.
url = reverse('api:job_job_events_list', args=(job.pk,))
with self.current_user(self.user_sue):
response = self.get(url)
qs = job.job_events.all()
self.assertTrue(qs.count())
self.check_pagination_and_size(response, qs.count())
self.check_list_ids(response, qs)
# Test individual job event detail records.
host_ids = set()
for job_event in job.job_events.all():
if job_event.host:
host_ids.add(job_event.host.pk)
url = reverse('api:job_event_detail', args=(job_event.pk,))
with self.current_user(self.user_sue):
response = self.get(url)
# Also test job event list for each host.
if getattr(settings, 'CAPTURE_JOB_EVENT_HOSTS', False):
for host in Host.objects.filter(pk__in=host_ids):
url = reverse('api:host_job_events_list', args=(host.pk,))
with self.current_user(self.user_sue):
response = self.get(url)
qs = host.job_events.all()
self.assertTrue(qs.count())
self.check_pagination_and_size(response, qs.count())
self.check_list_ids(response, qs)
# Test job event list for groups.
for group in self.inv_ops_east.groups.all():
url = reverse('api:group_job_events_list', args=(group.pk,))
with self.current_user(self.user_sue):
response = self.get(url)
qs = group.job_events.all()
self.assertTrue(qs.count(), group)
self.check_pagination_and_size(response, qs.count())
self.check_list_ids(response, qs)
# Test global job event list.
url = reverse('api:job_event_list')
with self.current_user(self.user_sue):
response = self.get(url)
qs = JobEvent.objects.all()
self.assertTrue(qs.count())
self.check_pagination_and_size(response, qs.count())
self.check_list_ids(response, qs)
# Test job host summaries for completed job.
url = reverse('api:job_job_host_summaries_list', args=(job.pk,))
with self.current_user(self.user_sue):
response = self.get(url)
qs = job.job_host_summaries.all()
self.assertTrue(qs.count())
self.check_pagination_and_size(response, qs.count())
self.check_list_ids(response, qs)
# Every host referenced by a job_event should be present as a job
# host summary record.
self.assertEqual(host_ids,
set(qs.values_list('host__pk', flat=True)))
# Test individual job host summary records.
for job_host_summary in job.job_host_summaries.all():
url = reverse('api:job_host_summary_detail',
args=(job_host_summary.pk,))
with self.current_user(self.user_sue):
response = self.get(url)
# Test job host summaries for each host.
for host in Host.objects.filter(pk__in=host_ids):
url = reverse('api:host_job_host_summaries_list', args=(host.pk,))
with self.current_user(self.user_sue):
response = self.get(url)
qs = host.job_host_summaries.all()
self.assertTrue(qs.count())
self.check_pagination_and_size(response, qs.count())
self.check_list_ids(response, qs)
# Test job host summaries for groups.
for group in self.inv_ops_east.groups.all():
url = reverse('api:group_job_host_summaries_list', args=(group.pk,))
with self.current_user(self.user_sue):
response = self.get(url)
qs = group.job_host_summaries.all()
self.assertTrue(qs.count())
self.check_pagination_and_size(response, qs.count())
self.check_list_ids(response, qs)

View File

@ -1,240 +0,0 @@
# Python
import json
# Django
from django.core.urlresolvers import reverse
# AWX
from awx.main.models import * # noqa
from awx.main.tests.base import BaseTest, QueueStartStopTestMixin
__all__ = ['SurveyPasswordRedactedTest']
PASSWORD="5m/h"
ENCRYPTED_STR='$encrypted$'
TEST_PLAYBOOK = u'''---
- name: test success
hosts: test-group
gather_facts: True
tasks:
- name: should pass
command: echo {{ %s }}
''' % ('spot_speed')
TEST_SIMPLE_SURVEY = '''
{
"name": "Simple",
"description": "Description",
"spec": [
{
"type": "password",
"question_name": "spots speed",
"question_description": "How fast can spot run?",
"variable": "%s",
"choices": "",
"min": "",
"max": "",
"required": false,
"default": "%s"
}
]
}
''' % ('spot_speed', PASSWORD)
TEST_COMPLEX_SURVEY = '''
{
"name": "Simple",
"description": "Description",
"spec": [
{
"type": "password",
"question_name": "spots speed",
"question_description": "How fast can spot run?",
"variable": "spot_speed",
"choices": "",
"min": "",
"max": "",
"required": false,
"default": "0m/h"
},
{
"type": "password",
"question_name": "ssn",
"question_description": "What's your social security number?",
"variable": "ssn",
"choices": "",
"min": "",
"max": "",
"required": false,
"default": "999-99-9999"
},
{
"type": "password",
"question_name": "bday",
"question_description": "What's your birth day?",
"variable": "bday",
"choices": "",
"min": "",
"max": "",
"required": false,
"default": "1/1/1970"
}
]
}
'''
TEST_SINGLE_PASSWORDS = [
{
'description': 'Single instance with a . after',
'text' : 'See spot. See spot run. See spot run %s. That is a fast run.' % PASSWORD,
'passwords': [PASSWORD],
'occurances': 1,
},
{
'description': 'Single instance with , after',
'text': 'Spot goes %s, at a fast pace' % PASSWORD,
'passwords': [PASSWORD],
'occurances': 1,
},
{
'description': 'Single instance with a space after',
'text': 'Is %s very fast?' % PASSWORD,
'passwords': [PASSWORD],
'occurances': 1,
},
{
'description': 'Many instances, also with newline',
'text': 'I think %s is very very fast. If I ran %s for 4 hours how many hours would I '
'run?.\nTrick question. %s for 4 hours would result in running for 4 hours' % (PASSWORD, PASSWORD, PASSWORD),
'passwords': [PASSWORD],
'occurances': 3,
},
]
passwd = 'my!@#$%^pass&*()_+'
TEST_SINGLE_PASSWORDS.append({
'description': 'password includes characters not in a-z 0-9 range',
'passwords': [passwd],
'text': 'Text is fun yeah with passwords %s.' % passwd,
'occurances': 1
})
# 3 because 3 password fields in spec TEST_COMPLEX_SURVEY
TEST_MULTIPLE_PASSWORDS = []
passwds = [ '65km/s', '545-83-4534', '7/4/2002']
TEST_MULTIPLE_PASSWORDS.append({
'description': '3 different passwords each used once',
'text': 'Spot runs %s. John has an ss of %s and is born on %s.' % (passwds[0], passwds[1], passwds[2]),
'passwords': passwds,
'occurances': 3,
})
TESTS = {
'simple': {
'survey' : json.loads(TEST_SIMPLE_SURVEY),
'tests' : TEST_SINGLE_PASSWORDS,
},
'complex': {
'survey' : json.loads(TEST_COMPLEX_SURVEY),
'tests' : TEST_MULTIPLE_PASSWORDS,
}
}
class SurveyPasswordBaseTest(BaseTest, QueueStartStopTestMixin):
def setUp(self):
super(SurveyPasswordBaseTest, self).setUp()
self.setup_instances()
self.setup_users()
def check_passwords_redacted(self, test, response):
self.assertIsNotNone(response['content'])
for password in test['passwords']:
self.check_not_found(response['content'], password, test['description'], word_boundary=True)
self.check_found(response['content'], ENCRYPTED_STR, test['occurances'], test['description'])
# TODO: A more complete test would ensure that the variable value isn't found
def check_extra_vars_redacted(self, test, response):
self.assertIsNotNone(response)
# Ensure that all extra_vars of type password have the value '$encrypted$'
vars = []
for question in test['survey']['spec']:
if question['type'] == 'password':
vars.append(question['variable'])
extra_vars = json.loads(response['extra_vars'])
for var in vars:
self.assertIn(var, extra_vars, 'Variable "%s" should exist in "%s"' % (var, extra_vars))
self.assertEqual(extra_vars[var], ENCRYPTED_STR)
def _get_url_job_stdout(self, job):
url = reverse('api:job_stdout', args=(job.pk,))
return self.get(url, expect=200, auth=self.get_super_credentials(), accept='application/json')
def _get_url_job_details(self, job):
url = reverse('api:job_detail', args=(job.pk,))
return self.get(url, expect=200, auth=self.get_super_credentials(), accept='application/json')
class SurveyPasswordRedactedTest(SurveyPasswordBaseTest):
'''
Transpose TEST[]['tests'] to the below format. A more flat format."
[
{
'text': '...',
'description': '...',
...,
'job': '...',
'survey': '...'
},
]
'''
def setup_test(self, test_name):
blueprint = TESTS[test_name]
self.tests[test_name] = []
job_template = self.make_job_template(survey_enabled=True, survey_spec=blueprint['survey'])
for test in blueprint['tests']:
test = dict(test)
extra_vars = {}
# build extra_vars from spec variables and passwords
for x in range(0, len(blueprint['survey']['spec'])):
question = blueprint['survey']['spec'][x]
extra_vars[question['variable']] = test['passwords'][x]
job = self.make_job(job_template=job_template)
job.extra_vars = json.dumps(extra_vars)
job.result_stdout_text = test['text']
job.save()
test['job'] = job
test['survey'] = blueprint['survey']
self.tests[test_name].append(test)
def setUp(self):
super(SurveyPasswordRedactedTest, self).setUp()
self.tests = {}
self.setup_test('simple')
self.setup_test('complex')
# should redact single variable survey
def test_redact_stdout_simple_survey(self):
for test in self.tests['simple']:
response = self._get_url_job_stdout(test['job'])
self.check_passwords_redacted(test, response)
# should redact multiple variables survey
def test_redact_stdout_complex_survey(self):
for test in self.tests['complex']:
response = self._get_url_job_stdout(test['job'])
self.check_passwords_redacted(test, response)
# should redact values in extra_vars
def test_redact_job_extra_vars(self):
for test in self.tests['simple']:
response = self._get_url_job_details(test['job'])
self.check_extra_vars_redacted(test, response)

File diff suppressed because it is too large Load Diff

View File

@ -1,210 +0,0 @@
# Copyright (c) 2015 Ansible, Inc.
# All Rights Reserved.
# Python
import datetime
# Django
from django.core.urlresolvers import reverse
from django.utils.timezone import now
# AWX
from awx.main.models import * # noqa
from awx.main.tests.base import BaseTest
__all__ = ['ScheduleTest']
YESTERDAY = (datetime.date.today() - datetime.timedelta(1)).strftime('%Y%m%dT075000Z')
UNTIL_SCHEDULE = "DTSTART:%s RRULE:FREQ=MINUTELY;INTERVAL=1;UNTIL=30230401T075000Z" % YESTERDAY
EXPIRED_SCHEDULES = ["DTSTART:19340331T055000Z RRULE:FREQ=MINUTELY;INTERVAL=10;COUNT=5"]
INFINITE_SCHEDULES = ["DTSTART:30340331T055000Z RRULE:FREQ=MINUTELY;INTERVAL=10"]
GOOD_SCHEDULES = ["DTSTART:20500331T055000Z RRULE:FREQ=MINUTELY;INTERVAL=10;COUNT=5",
"DTSTART:20240331T075000Z RRULE:FREQ=DAILY;INTERVAL=1;COUNT=1",
"DTSTART:%s RRULE:FREQ=MINUTELY;INTERVAL=1;UNTIL=20230401T075000Z" % YESTERDAY,
"DTSTART:20140331T075000Z RRULE:FREQ=WEEKLY;INTERVAL=1;BYDAY=MO,WE,FR",
"DTSTART:20140331T075000Z RRULE:FREQ=WEEKLY;INTERVAL=5;BYDAY=MO",
"DTSTART:20140331T075000Z RRULE:FREQ=MONTHLY;INTERVAL=1;BYMONTHDAY=6",
"DTSTART:20140331T075000Z RRULE:FREQ=MONTHLY;INTERVAL=1;BYSETPOS=4;BYDAY=SU",
"DTSTART:20140331T075000Z RRULE:FREQ=MONTHLY;INTERVAL=1;BYSETPOS=-1;BYDAY=MO,TU,WE,TH,FR",
"DTSTART:20140331T075000Z RRULE:FREQ=MONTHLY;INTERVAL=1;BYSETPOS=-1;BYDAY=MO,TU,WE,TH,FR,SA,SU",
"DTSTART:20140331T075000Z RRULE:FREQ=YEARLY;INTERVAL=1;BYMONTH=4;BYMONTHDAY=1",
"DTSTART:20140331T075000Z RRULE:FREQ=YEARLY;INTERVAL=1;BYSETPOS=-1;BYMONTH=8;BYDAY=SU",
"DTSTART:20140331T075000Z RRULE:FREQ=WEEKLY;INTERVAL=1;UNTIL=20230401T075000Z;BYDAY=MO,WE,FR",
"DTSTART:20140331T075000Z RRULE:FREQ=HOURLY;INTERVAL=1;UNTIL=20230610T075000Z",
"DTSTART:20140411T040000Z RRULE:FREQ=WEEKLY;INTERVAL=1;UNTIL=20140411T040000Z;BYDAY=WE"]
BAD_SCHEDULES = ["", "DTSTART:20140331T055000 RRULE:FREQ=MINUTELY;INTERVAL=10;COUNT=5",
"RRULE:FREQ=MINUTELY;INTERVAL=10;COUNT=5",
"FREQ=MINUTELY;INTERVAL=10;COUNT=5",
"DTSTART:20240331T075000Z RRULE:FREQ=DAILY;INTERVAL=1;COUNT=10000000",
"DTSTART;TZID=US-Eastern:19961105T090000 RRULE:FREQ=MINUTELY;INTERVAL=10;COUNT=5",
"DTSTART:20140331T055000Z RRULE:FREQ=SECONDLY;INTERVAL=1",
"DTSTART:20140331T055000Z RRULE:FREQ=SECONDLY",
"DTSTART:20140331T055000Z RRULE:FREQ=YEARLY;BYDAY=20MO;INTERVAL=1",
"DTSTART:20140331T055000Z RRULE:FREQ=MONTHLY;BYMONTHDAY=10,15;INTERVAL=1",
"DTSTART:20140331T055000Z RRULE:FREQ=YEARLY;BYMONTH=1,2;INTERVAL=1",
"DTSTART:20140331T055000Z RRULE:FREQ=YEARLY;BYYEARDAY=120;INTERVAL=1",
"DTSTART:20140331T055000Z RRULE:FREQ=YEARLY;BYWEEKNO=10;INTERVAL=1",
"DTSTART:20140331T055000Z RRULE:FREQ=HOURLY;INTERVAL=1 DTSTART:20140331T055000Z RRULE:FREQ=HOURLY;INTERVAL=1",
"DTSTART:20140331T055000Z RRULE:FREQ=HOURLY;INTERVAL=1 RRULE:FREQ=HOURLY;INTERVAL=1"]
class ScheduleTest(BaseTest):
def setUp(self):
super(ScheduleTest, self).setUp()
self.start_rabbit()
self.setup_instances()
self.setup_users()
self.organizations = self.make_organizations(self.super_django_user, 2)
self.organizations[0].admin_role.members.add(self.normal_django_user)
self.organizations[0].member_role.members.add(self.other_django_user)
self.organizations[0].member_role.members.add(self.normal_django_user)
self.diff_org_user = self.make_user('fred')
self.organizations[1].member_role.members.add(self.diff_org_user)
self.cloud_source = Credential.objects.create(kind='awx', username='Dummy', password='Dummy')
self.cloud_source.admin_role.members.add(self.super_django_user)
self.first_inventory = Inventory.objects.create(name='test_inventory', description='for org 0', organization=self.organizations[0])
self.first_inventory.hosts.create(name='host_1')
self.first_inventory_group = self.first_inventory.groups.create(name='group_1')
self.first_inventory_source = self.first_inventory_group.inventory_source
self.first_inventory_source.source = 'ec2'
self.first_inventory_source.save()
self.first_inventory.read_role.members.add(self.other_django_user)
self.second_inventory = Inventory.objects.create(name='test_inventory_2', description='for org 0', organization=self.organizations[0])
self.second_inventory.hosts.create(name='host_2')
self.second_inventory_group = self.second_inventory.groups.create(name='group_2')
self.second_inventory_source = self.second_inventory_group.inventory_source
self.second_inventory_source.source = 'ec2'
self.second_inventory_source.save()
self.first_schedule = Schedule.objects.create(name='test_schedule_1', unified_job_template=self.first_inventory_source,
enabled=True, rrule=GOOD_SCHEDULES[0])
self.second_schedule = Schedule.objects.create(name='test_schedule_2', unified_job_template=self.second_inventory_source,
enabled=True, rrule=GOOD_SCHEDULES[0])
self.without_valid_source_inventory = Inventory.objects.create(name='without valid source', description='for org 0', organization=self.organizations[0])
self.without_valid_source_inventory.hosts.create(name='host_3')
self.without_valid_source_inventory_group = self.without_valid_source_inventory.groups.create(name='not valid source')
self.without_valid_source_inventory_source = self.without_valid_source_inventory_group.inventory_source
def tearDown(self):
super(ScheduleTest, self).tearDown()
self.stop_rabbit()
def test_schedules_list(self):
url = reverse('api:schedule_list')
enabled_schedules = Schedule.objects.filter(enabled=True).distinct()
empty_schedules = Schedule.objects.none()
org_1_schedules = Schedule.objects.filter(unified_job_template=self.first_inventory_source)
#Super user can see everything
self.check_get_list(url, self.super_django_user, enabled_schedules)
# Unauth user should have no access
self.check_invalid_auth(url)
# regular org user with read permission can see only their schedules
self.check_get_list(url, self.other_django_user, org_1_schedules)
# other org user with no read perm can't see anything
self.check_get_list(url, self.diff_org_user, empty_schedules)
def test_post_new_schedule(self):
first_url = reverse('api:inventory_source_schedules_list', args=(self.first_inventory_source.pk,))
reverse('api:inventory_source_schedules_list', args=(self.second_inventory_source.pk,))
new_schedule = dict(name='newsched_1', description='newsched', enabled=True, rrule=GOOD_SCHEDULES[0])
# No auth should fail
self.check_invalid_auth(first_url, new_schedule, methods=('post',))
# Super user can post a new schedule
with self.current_user(self.super_django_user):
self.post(first_url, data=new_schedule, expect=201)
# #admin can post
admin_schedule = dict(name='newsched_2', description='newsched', enabled=True, rrule=GOOD_SCHEDULES[0])
self.post(first_url, data=admin_schedule, expect=201, auth=self.get_normal_credentials())
#normal user without write access can't post
unauth_schedule = dict(name='newsched_3', description='newsched', enabled=True, rrule=GOOD_SCHEDULES[0])
with self.current_user(self.other_django_user):
self.post(first_url, data=unauth_schedule, expect=403)
#give normal user write access and then they can post
self.first_inventory.admin_role.members.add(self.other_django_user)
auth_schedule = unauth_schedule
with self.current_user(self.other_django_user):
self.post(first_url, data=auth_schedule, expect=201)
# another org user shouldn't be able to post a schedule to this org's schedule
diff_user_schedule = dict(name='newsched_4', description='newsched', enabled=True, rrule=GOOD_SCHEDULES[0])
with self.current_user(self.diff_org_user):
self.post(first_url, data=diff_user_schedule, expect=403)
def test_post_schedule_to_non_cloud_source(self):
invalid_inv_url = reverse('api:inventory_source_schedules_list', args=(self.without_valid_source_inventory_source.pk,))
new_schedule = dict(name='newsched_1', description='newsched', enabled=True, rrule=GOOD_SCHEDULES[0])
with self.current_user(self.super_django_user):
self.post(invalid_inv_url, data=new_schedule, expect=400)
def test_update_existing_schedule(self):
first_url = reverse('api:inventory_source_schedules_list', args=(self.first_inventory_source.pk,))
new_schedule = dict(name='edit_schedule', description='going to change', enabled=True, rrule=EXPIRED_SCHEDULES[0])
with self.current_user(self.normal_django_user):
data = self.post(first_url, new_schedule, expect=201)
self.assertEquals(data['next_run'], None)
new_schedule_url = reverse('api:schedule_detail', args=(data['id'],))
data['rrule'] = GOOD_SCHEDULES[0]
with self.current_user(self.normal_django_user):
data = self.put(new_schedule_url, data=data, expect=200)
self.assertNotEqual(data['next_run'], None)
#TODO: Test path needed for non org-admin users, but rather regular users who have permission to create the JT associated with the Schedule
def test_infinite_schedule(self):
first_url = reverse('api:inventory_source_schedules_list', args=(self.first_inventory_source.pk,))
new_schedule = dict(name='inf_schedule', description='going forever', enabled=True, rrule=INFINITE_SCHEDULES[0])
with self.current_user(self.normal_django_user):
data = self.post(first_url, new_schedule, expect=201)
self.assertEquals(data['dtend'], None)
long_schedule = dict(name='long_schedule', description='going for a long time', enabled=True, rrule=UNTIL_SCHEDULE)
with self.current_user(self.normal_django_user):
data = self.post(first_url, long_schedule, expect=201)
self.assertNotEquals(data['dtend'], None)
def test_schedule_filtering(self):
first_url = reverse('api:inventory_source_schedules_list', args=(self.first_inventory_source.pk,))
start_time = now() + datetime.timedelta(minutes=5)
dtstart_str = start_time.strftime("%Y%m%dT%H%M%SZ")
new_schedule = dict(name="filter_schedule_1", enabled=True, rrule="DTSTART:%s RRULE:FREQ=MINUTELY;INTERVAL=10;COUNT=5" % dtstart_str)
with self.current_user(self.normal_django_user):
self.post(first_url, new_schedule, expect=201)
self.assertTrue(Schedule.objects.enabled().between(now(), now() + datetime.timedelta(minutes=10)).count(), 1)
start_time = now()
dtstart_str = start_time.strftime("%Y%m%dT%H%M%SZ")
new_schedule_middle = dict(name="runnable_schedule", enabled=True, rrule="DTSTART:%s RRULE:FREQ=MINUTELY;INTERVAL=10;COUNT=5" % dtstart_str)
with self.current_user(self.normal_django_user):
self.post(first_url, new_schedule_middle, expect=201)
self.assertTrue(Schedule.objects.enabled().between(now() - datetime.timedelta(minutes=10), now() + datetime.timedelta(minutes=10)).count(), 1)
def test_rrule_validation(self):
first_url = reverse('api:inventory_source_schedules_list', args=(self.first_inventory_source.pk,))
with self.current_user(self.normal_django_user):
for good_rule in GOOD_SCHEDULES:
sched_dict = dict(name=good_rule, enabled=True, rrule=good_rule)
self.post(first_url, sched_dict, expect=201)
for bad_rule in BAD_SCHEDULES:
sched_dict = dict(name=bad_rule, enabled=True, rrule=bad_rule)
self.post(first_url, sched_dict, expect=400)

View File

@ -1,402 +0,0 @@
# Copyright (c) 2015 Ansible, Inc.
# All Rights Reserved.
# Python
import json
import os
import subprocess
import sys
import urlparse
# AWX
from awx.main.models import * # noqa
from awx.main.tests.base import BaseLiveServerTest
__all__ = ['InventoryScriptTest']
class BaseScriptTest(BaseLiveServerTest):
'''
Base class for tests that run external scripts to access the API.
'''
def setUp(self):
super(BaseScriptTest, self).setUp()
self._sys_path = [x for x in sys.path]
self._environ = dict(os.environ.items())
self._temp_files = []
def tearDown(self):
super(BaseScriptTest, self).tearDown()
sys.path = self._sys_path
for k,v in self._environ.items():
if os.environ.get(k, None) != v:
os.environ[k] = v
for k,v in os.environ.items():
if k not in self._environ.keys():
del os.environ[k]
for tf in self._temp_files:
if os.path.exists(tf):
os.remove(tf)
def run_script(self, name, *args, **options):
'''
Run an external script and capture its stdout/stderr and return code.
'''
#stdin_fileobj = options.pop('stdin_fileobj', None)
pargs = [name]
for k,v in options.items():
pargs.append('%s%s' % ('-' if len(k) == 1 else '--', k))
if v is not True:
pargs.append(str(v))
for arg in args:
pargs.append(str(arg))
proc = subprocess.Popen(pargs, stdout=subprocess.PIPE,
stderr=subprocess.PIPE)
stdout, stderr = proc.communicate()
return proc.returncode, stdout, stderr
class InventoryScriptTest(BaseScriptTest):
'''
Test helper to run management command as standalone script.
'''
def setUp(self):
super(InventoryScriptTest, self).setUp()
self.start_rabbit()
self.setup_instances()
self.setup_users()
self.organizations = self.make_organizations(self.super_django_user, 2)
self.projects = self.make_projects(self.normal_django_user, 2)
self.organizations[0].projects.add(self.projects[1])
self.organizations[1].projects.add(self.projects[0])
self.inventories = []
self.hosts = []
self.groups = []
for n, organization in enumerate(self.organizations):
inventory = Inventory.objects.create(name='inventory-%d' % n,
description='description for inventory %d' % n,
organization=organization,
variables=json.dumps({'n': n}) if n else '')
self.inventories.append(inventory)
hosts = []
for x in xrange(10):
if n > 0:
variables = json.dumps({'ho': 'hum-%d' % x})
else:
variables = ''
host = inventory.hosts.create(name='host-%02d-%02d.example.com' % (n, x),
inventory=inventory,
variables=variables)
#if x in (3, 7):
# host.delete()
# continue
hosts.append(host)
# add localhost just to make sure it's thrown into all (Ansible github bug)
local = inventory.hosts.create(name='localhost', inventory=inventory, variables={})
hosts.append(local)
self.hosts.extend(hosts)
groups = []
for x in xrange(5):
if n > 0:
variables = json.dumps({'gee': 'whiz-%d' % x})
else:
variables = ''
group = inventory.groups.create(name='group-%d' % x,
inventory=inventory,
variables=variables)
#if x == 2:
# #group.delete()
# #continue
groups.append(group)
group.hosts.add(hosts[x])
group.hosts.add(hosts[x + 5])
if n > 0 and x == 4:
group.parents.add(groups[3])
if x == 4:
group.hosts.add(local)
self.groups.extend(groups)
hosts[3].delete()
hosts[7].delete()
groups[2].delete()
def tearDown(self):
super(InventoryScriptTest, self).tearDown()
self.stop_rabbit()
def run_inventory_script(self, *args, **options):
rest_api_url = self.live_server_url
parts = urlparse.urlsplit(rest_api_url)
username, password = self.get_super_credentials()
netloc = '%s:%s@%s' % (username, password, parts.netloc)
rest_api_url = urlparse.urlunsplit([parts.scheme, netloc, parts.path,
parts.query, parts.fragment])
os.environ.setdefault('REST_API_URL', rest_api_url)
#os.environ.setdefault('REST_API_TOKEN',
# self.super_django_user.auth_token.key)
name = os.path.join(os.path.dirname(__file__), '..', '..', '..', 'plugins',
'inventory', 'awxrest.py')
return self.run_script(name, *args, **options)
def test_without_inventory_id(self):
rc, stdout, stderr = self.run_inventory_script(list=True)
self.assertNotEqual(rc, 0, stderr)
self.assertEqual(json.loads(stdout), {'failed': True})
rc, stdout, stderr = self.run_inventory_script(host=self.hosts[0].name)
self.assertNotEqual(rc, 0, stderr)
self.assertEqual(json.loads(stdout), {'failed': True})
def test_list_with_inventory_id_as_argument(self):
inventory = self.inventories[0]
rc, stdout, stderr = self.run_inventory_script(list=True,
inventory=inventory.pk)
self.assertEqual(rc, 0, stderr)
data = json.loads(stdout)
groups = inventory.groups
groupnames = [ x for x in groups.values_list('name', flat=True)]
# it's ok for all to be here because due to an Ansible inventory workaround
# 127.0.0.1/localhost must show up in the all group
groupnames.append('all')
self.assertEqual(set(data.keys()), set(groupnames))
# Groups for this inventory should only have hosts, and no group
# variable data or parent/child relationships.
for k,v in data.items():
if k != 'all':
assert isinstance(v, dict)
assert isinstance(v['children'], (list,tuple))
assert isinstance(v['hosts'], (list,tuple))
assert isinstance(v['vars'], (dict))
group = inventory.groups.get(name=k)
hosts = group.hosts
hostnames = hosts.values_list('name', flat=True)
self.assertEqual(set(v['hosts']), set(hostnames))
else:
assert v['hosts'] == ['host-00-02.example.com', 'localhost']
# Command line argument for inventory ID should take precedence over
# environment variable.
inventory_pks = set(map(lambda x: x.pk, self.inventories))
invalid_id = [x for x in xrange(9999) if x not in inventory_pks][0]
os.environ['INVENTORY_ID'] = str(invalid_id)
rc, stdout, stderr = self.run_inventory_script(list=True,
inventory=inventory.pk)
self.assertEqual(rc, 0, stderr)
data = json.loads(stdout)
def test_list_with_inventory_id_in_environment(self):
inventory = self.inventories[1]
os.environ['INVENTORY_ID'] = str(inventory.pk)
rc, stdout, stderr = self.run_inventory_script(list=True)
self.assertEqual(rc, 0, stderr)
data = json.loads(stdout)
groups = inventory.groups
groupnames = list(groups.values_list('name', flat=True)) + ['all']
self.assertEqual(set(data.keys()), set(groupnames))
# Groups for this inventory should have hosts, variable data, and one
# parent/child relationship.
for k,v in data.items():
assert isinstance(v, dict)
if k == 'all':
self.assertEqual(v.get('vars', {}), inventory.variables_dict)
continue
group = inventory.groups.get(name=k)
hosts = group.hosts
hostnames = hosts.values_list('name', flat=True)
self.assertEqual(set(v.get('hosts', [])), set(hostnames))
if group.variables:
self.assertEqual(v.get('vars', {}), group.variables_dict)
if k == 'group-3':
children = group.children
childnames = children.values_list('name', flat=True)
self.assertEqual(set(v.get('children', [])), set(childnames))
else:
assert len(v['children']) == 0
def test_list_with_hostvars_inline(self):
inventory = self.inventories[1]
rc, stdout, stderr = self.run_inventory_script(list=True,
inventory=inventory.pk,
hostvars=True)
self.assertEqual(rc, 0, stderr)
data = json.loads(stdout)
groups = inventory.groups
groupnames = list(groups.values_list('name', flat=True))
groupnames.extend(['all', '_meta'])
self.assertEqual(set(data.keys()), set(groupnames))
all_hostnames = set()
# Groups for this inventory should have hosts, variable data, and one
# parent/child relationship.
for k,v in data.items():
assert isinstance(v, dict)
if k == 'all':
self.assertEqual(v.get('vars', {}), inventory.variables_dict)
continue
if k == '_meta':
continue
group = inventory.groups.get(name=k)
hosts = group.hosts
hostnames = hosts.values_list('name', flat=True)
all_hostnames.update(hostnames)
assert set(v.get('hosts', [])) == set(hostnames)
if group.variables:
assert v.get('vars', {}) == group.variables_dict
if k == 'group-3':
children = group.children
childnames = children.values_list('name', flat=True)
assert set(v.get('children', [])) == set(childnames)
else:
assert len(v['children']) == 0
# Check hostvars in ['_meta']['hostvars'] dict.
for hostname in all_hostnames:
assert hostname in data['_meta']['hostvars']
host = inventory.hosts.get(name=hostname)
self.assertEqual(data['_meta']['hostvars'][hostname],
host.variables_dict)
# Hostvars can also be requested via environment variable.
os.environ['INVENTORY_HOSTVARS'] = str(True)
rc, stdout, stderr = self.run_inventory_script(list=True,
inventory=inventory.pk)
self.assertEqual(rc, 0, stderr)
data = json.loads(stdout)
assert '_meta' in data
def test_valid_host(self):
# Host without variable data.
inventory = self.inventories[0]
host = inventory.hosts.all()[2]
os.environ['INVENTORY_ID'] = str(inventory.pk)
rc, stdout, stderr = self.run_inventory_script(host=host.name)
self.assertEqual(rc, 0, stderr)
data = json.loads(stdout)
self.assertEqual(data, {})
# Host with variable data.
inventory = self.inventories[1]
host = inventory.hosts.all()[4]
os.environ['INVENTORY_ID'] = str(inventory.pk)
rc, stdout, stderr = self.run_inventory_script(host=host.name)
self.assertEqual(rc, 0, stderr)
data = json.loads(stdout)
self.assertEqual(data, host.variables_dict)
def test_invalid_host(self):
# Valid host, but not part of the specified inventory.
inventory = self.inventories[0]
host = Host.objects.exclude(inventory=inventory)[0]
os.environ['INVENTORY_ID'] = str(inventory.pk)
rc, stdout, stderr = self.run_inventory_script(host=host.name)
self.assertNotEqual(rc, 0, stderr)
self.assertEqual(json.loads(stdout), {'failed': True})
# Invalid hostname not in database.
rc, stdout, stderr = self.run_inventory_script(host='blah.example.com')
self.assertNotEqual(rc, 0, stderr)
self.assertEqual(json.loads(stdout), {'failed': True})
def test_with_invalid_inventory_id(self):
inventory_pks = set(map(lambda x: x.pk, self.inventories))
invalid_id = [x for x in xrange(1, 9999) if x not in inventory_pks][0]
os.environ['INVENTORY_ID'] = str(invalid_id)
rc, stdout, stderr = self.run_inventory_script(list=True)
self.assertNotEqual(rc, 0, stderr)
self.assertEqual(json.loads(stdout), {'failed': True})
os.environ['INVENTORY_ID'] = 'not_an_int'
rc, stdout, stderr = self.run_inventory_script(list=True)
self.assertNotEqual(rc, 0, stderr)
self.assertEqual(json.loads(stdout), {'failed': True})
os.environ['INVENTORY_ID'] = str(invalid_id)
rc, stdout, stderr = self.run_inventory_script(host=self.hosts[1].name)
self.assertNotEqual(rc, 0, stderr)
self.assertEqual(json.loads(stdout), {'failed': True})
os.environ['INVENTORY_ID'] = 'not_an_int'
rc, stdout, stderr = self.run_inventory_script(host=self.hosts[2].name)
self.assertNotEqual(rc, 0, stderr)
self.assertEqual(json.loads(stdout), {'failed': True})
def test_with_deleted_inventory(self):
inventory = self.inventories[0]
pk = inventory.pk
inventory.delete()
os.environ['INVENTORY_ID'] = str(pk)
rc, stdout, stderr = self.run_inventory_script(list=True)
self.assertNotEqual(rc, 0, stderr)
self.assertEqual(json.loads(stdout), {'failed': True})
def test_without_list_or_host_argument(self):
inventory = self.inventories[0]
os.environ['INVENTORY_ID'] = str(inventory.pk)
rc, stdout, stderr = self.run_inventory_script()
self.assertNotEqual(rc, 0, stderr)
self.assertEqual(json.loads(stdout), {'failed': True})
def test_with_both_list_and_host_arguments(self):
inventory = self.inventories[0]
os.environ['INVENTORY_ID'] = str(inventory.pk)
rc, stdout, stderr = self.run_inventory_script(list=True, host='blah')
self.assertNotEqual(rc, 0, stderr)
self.assertEqual(json.loads(stdout), {'failed': True})
def test_with_disabled_hosts(self):
inventory = self.inventories[1]
for host in inventory.hosts.filter(enabled=True):
host.enabled = False
host.save(update_fields=['enabled'])
os.environ['INVENTORY_ID'] = str(inventory.pk)
# Load inventory list as normal (only enabled hosts).
rc, stdout, stderr = self.run_inventory_script(list=True)
self.assertEqual(rc, 0, stderr)
data = json.loads(stdout)
groups = inventory.groups
groupnames = list(groups.values_list('name', flat=True)) + ['all']
self.assertEqual(set(data.keys()), set(groupnames))
for k,v in data.items():
assert isinstance(v, dict)
if k == 'all':
self.assertEqual(v.get('vars', {}), inventory.variables_dict)
continue
group = inventory.groups.get(name=k)
hosts = group.hosts.filter(enabled=True)
hostnames = hosts.values_list('name', flat=True)
self.assertEqual(set(v.get('hosts', [])), set(hostnames))
self.assertFalse(hostnames)
if group.variables:
self.assertEqual(v.get('vars', {}), group.variables_dict)
if k == 'group-3':
children = group.children
childnames = children.values_list('name', flat=True)
self.assertEqual(set(v.get('children', [])), set(childnames))
else:
assert len(v['children']) == 0
# Load inventory list with all hosts.
rc, stdout, stderr = self.run_inventory_script(list=True, all=True)
self.assertEqual(rc, 0, stderr)
data = json.loads(stdout)
groups = inventory.groups
groupnames = list(groups.values_list('name', flat=True)) + ['all']
self.assertEqual(set(data.keys()), set(groupnames))
for k,v in data.items():
assert isinstance(v, dict)
if k == 'all':
self.assertEqual(v.get('vars', {}), inventory.variables_dict)
continue
group = inventory.groups.get(name=k)
hosts = group.hosts
hostnames = hosts.values_list('name', flat=True)
self.assertEqual(set(v.get('hosts', [])), set(hostnames))
assert hostnames
if group.variables:
self.assertEqual(v.get('vars', {}), group.variables_dict)
if k == 'group-3':
children = group.children
childnames = children.values_list('name', flat=True)
self.assertEqual(set(v.get('children', [])), set(childnames))
else:
assert len(v['children']) == 0

View File

@ -1,128 +0,0 @@
# Copyright (c) 2016 Ansible, Inc.
# All Rights Reserved.
import pytest
from awx.main.tests.base import BaseTest
from awx.main.models import * # noqa
from django.core.urlresolvers import reverse
from django.test.utils import override_settings
TEST_TOWER_SETTINGS_MANIFEST = {
"TEST_SETTING_INT": {
"name": "An Integer Field",
"description": "An Integer Field",
"default": 1,
"type": "int",
"category": "test"
},
"TEST_SETTING_STRING": {
"name": "A String Field",
"description": "A String Field",
"default": "test",
"type": "string",
"category": "test"
},
"TEST_SETTING_BOOL": {
"name": "A Bool Field",
"description": "A Bool Field",
"default": True,
"type": "bool",
"category": "test"
},
"TEST_SETTING_LIST": {
"name": "A List Field",
"description": "A List Field",
"default": ["A", "Simple", "List"],
"type": "list",
"category": "test"
},
"TEST_SETTING_JSON": {
"name": "A JSON Field",
"description": "A JSON Field",
"default": {"key": "value", "otherkey": ["list", "of", "things"]},
"type": "json",
"category": "test"
}
}
@override_settings(TOWER_SETTINGS_MANIFEST=TEST_TOWER_SETTINGS_MANIFEST)
@pytest.mark.skip(reason="Settings deferred to 3.1")
class SettingsPlaceholder(BaseTest):
def setUp(self):
super(SettingsTest, self).setUp()
self.setup_instances()
self.setup_users()
def get_settings(self, expected_count=5):
result = self.get(reverse('api:settings_list'), expect=200)
self.assertEqual(result['count'], expected_count)
return result['results']
def get_individual_setting(self, setting):
all_settings = self.get_settings()
setting_actual = None
for setting_item in all_settings:
if setting_item['key'] == setting:
setting_actual = setting_item
break
self.assertIsNotNone(setting_actual)
return setting_actual
def set_setting(self, key, value):
self.post(reverse('api:settings_list'), data={"key": key, "value": value}, expect=201)
def test_get_settings(self):
# Regular user should see nothing (no user settings yet)
with self.current_user(self.normal_django_user):
self.get_settings(expected_count=0)
# anonymous user should get a 401
self.get(reverse('api:settings_list'), expect=401)
# super user can see everything
with self.current_user(self.super_django_user):
self.get_settings(expected_count=len(TEST_TOWER_SETTINGS_MANIFEST))
def set_and_reset_setting(self, key, values, expected_values=()):
settings_reset = reverse('api:settings_reset')
setting = self.get_individual_setting(key)
self.assertEqual(setting['value'], TEST_TOWER_SETTINGS_MANIFEST[key]['default'])
for n, value in enumerate(values):
self.set_setting(key, value)
setting = self.get_individual_setting(key)
if len(expected_values) > n:
self.assertEqual(setting['value'], expected_values[n])
else:
self.assertEqual(setting['value'], value)
self.post(settings_reset, data={"key": key}, expect=204)
setting = self.get_individual_setting(key)
self.assertEqual(setting['value'], TEST_TOWER_SETTINGS_MANIFEST[key]['default'])
def test_set_and_reset_settings(self):
with self.current_user(self.super_django_user):
self.set_and_reset_setting('TEST_SETTING_INT', (2, 0))
self.set_and_reset_setting('TEST_SETTING_STRING', ('blah', '', u'\u2620'))
self.set_and_reset_setting('TEST_SETTING_BOOL', (True, False))
# List values are always saved as strings.
self.set_and_reset_setting('TEST_SETTING_LIST', ([4, 5, 6], [], [2]), (['4', '5', '6'], [], ['2']))
self.set_and_reset_setting('TEST_SETTING_JSON', ({"k": "v"}, {}, [], [7, 8], 'str'))
def test_clear_all_settings(self):
settings_list = reverse('api:settings_list')
with self.current_user(self.super_django_user):
self.set_setting('TEST_SETTING_INT', 2)
self.set_setting('TEST_SETTING_STRING', "foo")
self.set_setting('TEST_SETTING_BOOL', False)
self.set_setting('TEST_SETTING_LIST', [1,2,3])
self.set_setting('TEST_SETTING_JSON', '{"key": "new value"}')
all_settings = self.get_settings()
for setting_entry in all_settings:
self.assertNotEqual(setting_entry['value'],
TEST_TOWER_SETTINGS_MANIFEST[setting_entry['key']]['default'])
self.delete(settings_list, expect=200)
all_settings = self.get_settings()
for setting_entry in all_settings:
self.assertEqual(setting_entry['value'],
TEST_TOWER_SETTINGS_MANIFEST[setting_entry['key']]['default'])

File diff suppressed because it is too large Load Diff

File diff suppressed because it is too large Load Diff