1
0
mirror of https://github.com/ansible/awx.git synced 2024-11-02 01:21:21 +03:00

Bypass task runner system in normal job start tests... we'll test it another way so assume we want to just start the job right away

This commit is contained in:
Matthew Jones 2014-03-19 11:09:14 -04:00
parent 1093b00e2a
commit 14e138a7bd
8 changed files with 180 additions and 242 deletions

View File

@ -709,7 +709,10 @@ class InventorySource(PrimordialModel):
def update(self, **kwargs):
if self.can_update:
inventory_update = self.inventory_updates.create()
inventory_update.signal_start(**kwargs)
if hasattr(settings, 'CELERY_UNIT_TEST'):
inventory_update.start(None, **kwargs)
else:
inventory_update.signal_start(**kwargs)
return inventory_update
def get_absolute_url(self):

View File

@ -385,6 +385,8 @@ class Job(CommonTask):
return dependencies
def signal_start(self, **kwargs):
if hasattr(settings, 'CELERY_UNIT_TEST'):
return self.start(None, **kwargs)
if not self.can_start:
return False
needed = self._get_passwords_needed_to_start()

View File

@ -295,7 +295,10 @@ class Project(CommonModel):
def update(self, **kwargs):
if self.can_update:
project_update = self.project_updates.create()
project_update.signal_start(**kwargs)
if hasattr(settings, 'CELERY_UNIT_TEST'):
project_update.start(None, **kwargs)
else:
project_update.signal_start(**kwargs)
return project_update
def get_absolute_url(self):

View File

@ -323,13 +323,11 @@ class CleanupJobsTest(BaseCommandMixin, BaseLiveServerTest):
self.project = None
self.credential = None
settings.INTERNAL_API_URL = self.live_server_url
self.start_taskmanager(settings.TASK_COMMAND_PORT)
self.start_queue(settings.CALLBACK_CONSUMER_PORT, settings.CALLBACK_QUEUE_PORT)
def tearDown(self):
super(CleanupJobsTest, self).tearDown()
self.terminate_queue()
self.terminate_taskmanager()
if self.test_project_path:
shutil.rmtree(self.test_project_path, True)

View File

@ -32,7 +32,7 @@ class InventoryTest(BaseTest):
self.inventory_a = Inventory.objects.create(name='inventory-a', description='foo', organization=self.organizations[0])
self.inventory_b = Inventory.objects.create(name='inventory-b', description='bar', organization=self.organizations[1])
# the normal user is an org admin of org 0
# create a permission here on the 'other' user so they have edit access on the org
@ -993,14 +993,12 @@ class InventoryUpdatesTest(BaseTransactionTest):
self.group = self.inventory.groups.create(name='Cloud Group')
self.inventory2 = self.organization.inventories.create(name='Cloud Inventory 2')
self.group2 = self.inventory2.groups.create(name='Cloud Group 2')
self.start_taskmanager(settings.TASK_COMMAND_PORT)
self.start_queue(settings.CALLBACK_CONSUMER_PORT, settings.CALLBACK_QUEUE_PORT)
def tearDown(self):
super(InventoryUpdatesTest, self).tearDown()
self.terminate_taskmanager()
self.terminate_queue()
def update_inventory_source(self, group, **kwargs):
inventory_source = group.inventory_source
update_fields = []

View File

@ -447,13 +447,11 @@ class BaseJobTestMixin(BaseTestMixin):
def setUp(self):
super(BaseJobTestMixin, self).setUp()
self.populate()
self.start_taskmanager(settings.TASK_COMMAND_PORT)
if settings.CALLBACK_CONSUMER_PORT:
self.start_queue(settings.CALLBACK_CONSUMER_PORT, settings.CALLBACK_QUEUE_PORT)
def tearDown(self):
super(BaseJobTestMixin, self).tearDown()
self.terminate_taskmanager()
self.terminate_queue()
class JobTemplateTest(BaseJobTestMixin, django.test.TestCase):
@ -785,7 +783,6 @@ class JobTest(BaseJobTestMixin, django.test.TestCase):
# asynchronously; the start API call will update the database, queue the task,
# then return immediately (committing the transaction) before celery has even
# woken up to run the new task.
# FIXME: TODO: These tests are completely broken at the moment, we cover a lot of the run actions in the tasks tests anyway
MIDDLEWARE_CLASSES = filter(lambda x: not x.endswith('TransactionMiddleware'),
settings.MIDDLEWARE_CLASSES)
@ -902,133 +899,133 @@ class JobStartCancelTest(BaseJobTestMixin, django.test.LiveServerTestCase):
# FIXME: Test with other users, test when passwords are required.
# def test_job_cancel(self):
# #job = self.job_ops_east_run
# job = self.make_job(self.jt_ops_east_run, self.user_sue, 'new')
# url = reverse('api:job_cancel', args=(job.pk,))
def test_job_cancel(self):
#job = self.job_ops_east_run
job = self.make_job(self.jt_ops_east_run, self.user_sue, 'new')
url = reverse('api:job_cancel', args=(job.pk,))
# # Test with no auth and with invalid login.
# self.check_invalid_auth(url)
# self.check_invalid_auth(url, methods=('post',))
# Test with no auth and with invalid login.
self.check_invalid_auth(url)
self.check_invalid_auth(url, methods=('post',))
# # sue can cancel the job, but only when it is pending or running.
# for status in [x[0] for x in TASK_STATUS_CHOICES]:
# if status == 'waiting':
# continue
# job.status = status
# job.save()
# with self.current_user(self.user_sue):
# response = self.get(url)
# if status in ('pending', 'running'):
# self.assertTrue(response['can_cancel'])
# response = self.post(url, {}, expect=202)
# else:
# self.assertFalse(response['can_cancel'])
# response = self.post(url, {}, expect=405)
# sue can cancel the job, but only when it is pending or running.
for status in [x[0] for x in TASK_STATUS_CHOICES]:
if status == 'waiting':
continue
job.status = status
job.save()
with self.current_user(self.user_sue):
response = self.get(url)
if status in ('pending', 'running'):
self.assertTrue(response['can_cancel'])
response = self.post(url, {}, expect=202)
else:
self.assertFalse(response['can_cancel'])
response = self.post(url, {}, expect=405)
# FIXME: Test with other users.
# def test_get_job_results(self):
# # Start/run a job and then access its results via the API.
# #job = self.job_ops_east_run
# job = self.make_job(self.jt_ops_east_run, self.user_sue, 'new')
# job.signal_start()
def test_get_job_results(self):
# Start/run a job and then access its results via the API.
#job = self.job_ops_east_run
job = self.make_job(self.jt_ops_east_run, self.user_sue, 'new')
job.start()
# # Check that the job detail has been updated.
# url = reverse('api:job_detail', args=(job.pk,))
# with self.current_user(self.user_sue):
# response = self.get(url)
# self.assertEqual(response['status'], 'successful',
# response['result_traceback'])
# self.assertTrue(response['result_stdout'])
# Check that the job detail has been updated.
url = reverse('api:job_detail', args=(job.pk,))
with self.current_user(self.user_sue):
response = self.get(url)
self.assertEqual(response['status'], 'successful',
response['result_traceback'])
self.assertTrue(response['result_stdout'])
# # Test job events for completed job.
# url = reverse('api:job_job_events_list', args=(job.pk,))
# with self.current_user(self.user_sue):
# response = self.get(url)
# qs = job.job_events.all()
# self.assertTrue(qs.count())
# self.check_pagination_and_size(response, qs.count())
# self.check_list_ids(response, qs)
# Test job events for completed job.
url = reverse('api:job_job_events_list', args=(job.pk,))
with self.current_user(self.user_sue):
response = self.get(url)
qs = job.job_events.all()
self.assertTrue(qs.count())
self.check_pagination_and_size(response, qs.count())
self.check_list_ids(response, qs)
# # Test individual job event detail records.
# host_ids = set()
# for job_event in job.job_events.all():
# if job_event.host:
# host_ids.add(job_event.host.pk)
# url = reverse('api:job_event_detail', args=(job_event.pk,))
# with self.current_user(self.user_sue):
# response = self.get(url)
# Test individual job event detail records.
host_ids = set()
for job_event in job.job_events.all():
if job_event.host:
host_ids.add(job_event.host.pk)
url = reverse('api:job_event_detail', args=(job_event.pk,))
with self.current_user(self.user_sue):
response = self.get(url)
# # Also test job event list for each host.
# if getattr(settings, 'CAPTURE_JOB_EVENT_HOSTS', False):
# for host in Host.objects.filter(pk__in=host_ids):
# url = reverse('api:host_job_events_list', args=(host.pk,))
# with self.current_user(self.user_sue):
# response = self.get(url)
# qs = host.job_events.all()
# self.assertTrue(qs.count())
# self.check_pagination_and_size(response, qs.count())
# self.check_list_ids(response, qs)
# Also test job event list for each host.
if getattr(settings, 'CAPTURE_JOB_EVENT_HOSTS', False):
for host in Host.objects.filter(pk__in=host_ids):
url = reverse('api:host_job_events_list', args=(host.pk,))
with self.current_user(self.user_sue):
response = self.get(url)
qs = host.job_events.all()
self.assertTrue(qs.count())
self.check_pagination_and_size(response, qs.count())
self.check_list_ids(response, qs)
# # Test job event list for groups.
# for group in self.inv_ops_east.groups.all():
# url = reverse('api:group_job_events_list', args=(group.pk,))
# with self.current_user(self.user_sue):
# response = self.get(url)
# qs = group.job_events.all()
# self.assertTrue(qs.count(), group)
# self.check_pagination_and_size(response, qs.count())
# self.check_list_ids(response, qs)
# Test job event list for groups.
for group in self.inv_ops_east.groups.all():
url = reverse('api:group_job_events_list', args=(group.pk,))
with self.current_user(self.user_sue):
response = self.get(url)
qs = group.job_events.all()
self.assertTrue(qs.count(), group)
self.check_pagination_and_size(response, qs.count())
self.check_list_ids(response, qs)
# # Test global job event list.
# url = reverse('api:job_event_list')
# with self.current_user(self.user_sue):
# response = self.get(url)
# qs = JobEvent.objects.all()
# self.assertTrue(qs.count())
# self.check_pagination_and_size(response, qs.count())
# self.check_list_ids(response, qs)
# Test global job event list.
url = reverse('api:job_event_list')
with self.current_user(self.user_sue):
response = self.get(url)
qs = JobEvent.objects.all()
self.assertTrue(qs.count())
self.check_pagination_and_size(response, qs.count())
self.check_list_ids(response, qs)
# # Test job host summaries for completed job.
# url = reverse('api:job_job_host_summaries_list', args=(job.pk,))
# with self.current_user(self.user_sue):
# response = self.get(url)
# qs = job.job_host_summaries.all()
# self.assertTrue(qs.count())
# self.check_pagination_and_size(response, qs.count())
# self.check_list_ids(response, qs)
# # Every host referenced by a job_event should be present as a job
# # host summary record.
# self.assertEqual(host_ids,
# set(qs.values_list('host__pk', flat=True)))
# Test job host summaries for completed job.
url = reverse('api:job_job_host_summaries_list', args=(job.pk,))
with self.current_user(self.user_sue):
response = self.get(url)
qs = job.job_host_summaries.all()
self.assertTrue(qs.count())
self.check_pagination_and_size(response, qs.count())
self.check_list_ids(response, qs)
# Every host referenced by a job_event should be present as a job
# host summary record.
self.assertEqual(host_ids,
set(qs.values_list('host__pk', flat=True)))
# # Test individual job host summary records.
# for job_host_summary in job.job_host_summaries.all():
# url = reverse('api:job_host_summary_detail',
# args=(job_host_summary.pk,))
# with self.current_user(self.user_sue):
# response = self.get(url)
# Test individual job host summary records.
for job_host_summary in job.job_host_summaries.all():
url = reverse('api:job_host_summary_detail',
args=(job_host_summary.pk,))
with self.current_user(self.user_sue):
response = self.get(url)
# # Test job host summaries for each host.
# for host in Host.objects.filter(pk__in=host_ids):
# url = reverse('api:host_job_host_summaries_list', args=(host.pk,))
# with self.current_user(self.user_sue):
# response = self.get(url)
# qs = host.job_host_summaries.all()
# self.assertTrue(qs.count())
# self.check_pagination_and_size(response, qs.count())
# self.check_list_ids(response, qs)
# Test job host summaries for each host.
for host in Host.objects.filter(pk__in=host_ids):
url = reverse('api:host_job_host_summaries_list', args=(host.pk,))
with self.current_user(self.user_sue):
response = self.get(url)
qs = host.job_host_summaries.all()
self.assertTrue(qs.count())
self.check_pagination_and_size(response, qs.count())
self.check_list_ids(response, qs)
# # Test job host summaries for groups.
# for group in self.inv_ops_east.groups.all():
# url = reverse('api:group_job_host_summaries_list', args=(group.pk,))
# with self.current_user(self.user_sue):
# response = self.get(url)
# qs = group.job_host_summaries.all()
# self.assertTrue(qs.count())
# self.check_pagination_and_size(response, qs.count())
# self.check_list_ids(response, qs)
# Test job host summaries for groups.
for group in self.inv_ops_east.groups.all():
url = reverse('api:group_job_host_summaries_list', args=(group.pk,))
with self.current_user(self.user_sue):
response = self.get(url)
qs = group.job_host_summaries.all()
self.assertTrue(qs.count())
self.check_pagination_and_size(response, qs.count())
self.check_list_ids(response, qs)
@override_settings(CELERY_ALWAYS_EAGER=True,
CELERY_EAGER_PROPAGATES_EXCEPTIONS=True,
@ -1179,9 +1176,8 @@ class JobTemplateCallbackTest(BaseJobTestMixin, django.test.LiveServerTestCase):
job = jobs_qs[0]
self.assertEqual(job.launch_type, 'callback')
self.assertEqual(job.limit, host.name)
# TODO: Actual job runs are broken in this
#self.assertEqual(job.hosts.count(), 1)
#self.assertEqual(job.hosts.all()[0], host)
self.assertEqual(job.hosts.count(), 1)
self.assertEqual(job.hosts.all()[0], host)
# GET as unauthenticated user will prompt for authentication.
self.get(url, expect=401, remote_addr=host_ip)
@ -1224,9 +1220,8 @@ class JobTemplateCallbackTest(BaseJobTestMixin, django.test.LiveServerTestCase):
job = jobs_qs[0]
self.assertEqual(job.launch_type, 'callback')
self.assertEqual(job.limit, host.name)
# TODO: Actual job runs are broken in this
#self.assertEqual(job.hosts.count(), 1)
#self.assertEqual(job.hosts.all()[0], host)
self.assertEqual(job.hosts.count(), 1)
self.assertEqual(job.hosts.all()[0], host)
# Try using an IP for the host that doesn't resolve via reverse lookup,
# but can be found by doing a forward lookup on the host name.
@ -1250,9 +1245,8 @@ class JobTemplateCallbackTest(BaseJobTestMixin, django.test.LiveServerTestCase):
job = jobs_qs[0]
self.assertEqual(job.launch_type, 'callback')
self.assertEqual(job.limit, host.name)
# TODO: Actual job runs are broken in this
#self.assertEqual(job.hosts.count(), 1)
#self.assertEqual(job.hosts.all()[0], host)
self.assertEqual(job.hosts.count(), 1)
self.assertEqual(job.hosts.all()[0], host)
# Try using address only specified via ansible_ssh_host.
host_qs = job_template.inventory.hosts.order_by('pk')
@ -1265,9 +1259,8 @@ class JobTemplateCallbackTest(BaseJobTestMixin, django.test.LiveServerTestCase):
job = jobs_qs[0]
self.assertEqual(job.launch_type, 'callback')
self.assertEqual(job.limit, host.name)
# TODO: Actual job runs are broken in this
#self.assertEqual(job.hosts.count(), 1)
#self.assertEqual(job.hosts.all()[0], host)
self.assertEqual(job.hosts.count(), 1)
self.assertEqual(job.hosts.all()[0], host)
# Try when hostname is also an IP address, even if a different one is
# specified via ansible_ssh_host.
@ -1293,9 +1286,8 @@ class JobTemplateCallbackTest(BaseJobTestMixin, django.test.LiveServerTestCase):
job = jobs_qs[0]
self.assertEqual(job.launch_type, 'callback')
self.assertEqual(job.limit, host.name)
# TODO: Actual job runs are broken in this
#self.assertEqual(job.hosts.count(), 1)
#self.assertEqual(job.hosts.all()[0], host)
self.assertEqual(job.hosts.count(), 1)
self.assertEqual(job.hosts.all()[0], host)
# Find a new job template to use.
job_template = None

View File

@ -674,17 +674,18 @@ class ProjectsTest(BaseTest):
@override_settings(CELERY_ALWAYS_EAGER=True,
CELERY_EAGER_PROPAGATES_EXCEPTIONS=True,
ANSIBLE_TRANSPORT='local',
UNIT_TEST_IGNORE_TASK_WAIT=True,
PROJECT_UPDATE_IDLE_TIMEOUT=60,
PROJECT_UPDATE_VVV=True)
class ProjectUpdatesTest(BaseTransactionTest):
def setUp(self):
super(ProjectUpdatesTest, self).setUp()
self.start_queue(settings.CALLBACK_CONSUMER_PORT, settings.CALLBACK_QUEUE_PORT)
self.setup_users()
def tearDown(self):
super(ProjectUpdatesTest, self).tearDown()
self.terminate_queue()
def create_project(self, **kwargs):
cred_fields = ['scm_username', 'scm_password', 'scm_key_data',
@ -1111,10 +1112,7 @@ class ProjectUpdatesTest(BaseTransactionTest):
else:
self.check_project_update(project, should_fail=should_still_fail)
@override_settings(IGNORE_CELERY_INSPECTOR=True)
def test_create_project_with_scm(self):
self.start_taskmanager(settings.TASK_COMMAND_PORT)
self.start_queue(settings.CALLBACK_CONSUMER_PORT, settings.CALLBACK_QUEUE_PORT)
scm_url = getattr(settings, 'TEST_GIT_PUBLIC_HTTPS',
'https://github.com/ansible/ansible.github.com.git')
if not all([scm_url]):
@ -1185,12 +1183,8 @@ class ProjectUpdatesTest(BaseTransactionTest):
}
with self.current_user(self.super_django_user):
self.post(projects_url, project_data, expect=201)
self.terminate_taskmanager()
self.terminate_queue()
def test_public_git_project_over_https(self):
self.start_taskmanager(settings.TASK_COMMAND_PORT)
self.start_queue(settings.CALLBACK_CONSUMER_PORT, settings.CALLBACK_QUEUE_PORT)
scm_url = getattr(settings, 'TEST_GIT_PUBLIC_HTTPS',
'https://github.com/ansible/ansible.github.com.git')
if not all([scm_url]):
@ -1214,12 +1208,8 @@ class ProjectUpdatesTest(BaseTransactionTest):
scm_password=scm_password,
)
self.check_project_update(project2)
self.terminate_taskmanager()
self.terminate_queue()
def test_private_git_project_over_https(self):
self.start_taskmanager(settings.TASK_COMMAND_PORT)
self.start_queue(settings.CALLBACK_CONSUMER_PORT, settings.CALLBACK_QUEUE_PORT)
scm_url = getattr(settings, 'TEST_GIT_PRIVATE_HTTPS', '')
scm_username = getattr(settings, 'TEST_GIT_USERNAME', '')
scm_password = getattr(settings, 'TEST_GIT_PASSWORD', '')
@ -1233,12 +1223,8 @@ class ProjectUpdatesTest(BaseTransactionTest):
scm_password=scm_password,
)
self.check_project_scm(project)
self.terminate_taskmanager()
self.termiante_queue()
def test_private_git_project_over_ssh(self):
self.start_taskmanager(settings.TASK_COMMAND_PORT)
self.start_queue(settings.CALLBACK_CONSUMER_PORT, settings.CALLBACK_QUEUE_PORT)
scm_url = getattr(settings, 'TEST_GIT_PRIVATE_SSH', '')
scm_key_data = getattr(settings, 'TEST_GIT_KEY_DATA', '')
scm_username = getattr(settings, 'TEST_GIT_USERNAME', '')
@ -1264,59 +1250,50 @@ class ProjectUpdatesTest(BaseTransactionTest):
should_error = bool('github.com' in scm_url and scm_username != 'git')
self.check_project_update(project2, should_fail=None)#,
#should_error=should_error)
self.terminate_taskmanager()
self.terminate_queue()
# TODO: This does not work well with the new task system. Rework.
# @override_settings(IGNORE_CELERY_INSPECTOR=True, DEBUG=True)
# def _test_scm_key_unlock_on_project_update(self):
# self.start_taskmanager(settings.TASK_COMMAND_PORT)
# self.start_queue(settings.CALLBACK_CONSUMER_PORT, settings.CALLBACK_QUEUE_PORT)
# scm_url = 'git@github.com:ansible/ansible.github.com.git'
# project = self.create_project(
# name='my git project over ssh with encrypted key',
# scm_type='git',
# scm_url=scm_url,
# scm_key_data=TEST_SSH_KEY_DATA_LOCKED,
# scm_key_unlock=TEST_SSH_KEY_DATA_UNLOCK,
# )
# url = reverse('api:project_update_view', args=(project.pk,))
# with self.current_user(self.super_django_user):
# response = self.get(url, expect=200)
# self.assertTrue(response['can_update'])
# with self.current_user(self.super_django_user):
# response = self.post(url, {}, expect=202)
# time.sleep(15)
# print("PU: " + str(project.project_updates.all()[0].result_traceback))
# project_update = project.project_updates.filter(status='successful').order_by('-pk')[0]
# self.check_project_update(project, should_fail=None,
# project_update=project_update)
# # Verify that we responded to ssh-agent prompt.
# self.assertTrue('Identity added' in project_update.result_stdout,
# project_update.result_stdout)
# # Try again with a bad unlock password.
# project = self.create_project(
# name='my git project over ssh with encrypted key and bad pass',
# scm_type='git',
# scm_url=scm_url,
# scm_key_data=TEST_SSH_KEY_DATA_LOCKED,
# scm_key_unlock='not the right password',
# )
# with self.current_user(self.super_django_user):
# response = self.get(url, expect=200)
# self.assertTrue(response['can_update'])
# with self.current_user(self.super_django_user):
# response = self.post(url, {}, expect=202)
# project_update = project.project_updates.order_by('-pk')[0]
# self.check_project_update(project, should_fail=None,
# project_update=project_update)
# # Verify response to ssh-agent prompt, did not accept password.
# self.assertTrue('Bad passphrase' in project_update.result_stdout,
# project_update.result_stdout)
# self.assertFalse('Identity added' in project_update.result_stdout,
# project_update.result_stdout)
# self.terminate_taskamanger()
# self.terminate_queue()
def test_scm_key_unlock_on_project_update(self):
scm_url = 'git@github.com:ansible/ansible.github.com.git'
project = self.create_project(
name='my git project over ssh with encrypted key',
scm_type='git',
scm_url=scm_url,
scm_key_data=TEST_SSH_KEY_DATA_LOCKED,
scm_key_unlock=TEST_SSH_KEY_DATA_UNLOCK,
)
url = reverse('api:project_update_view', args=(project.pk,))
with self.current_user(self.super_django_user):
response = self.get(url, expect=200)
self.assertTrue(response['can_update'])
with self.current_user(self.super_django_user):
response = self.post(url, {}, expect=202)
time.sleep(15)
project_update = project.project_updates.filter(status='successful').order_by('-pk')[0]
self.check_project_update(project, should_fail=None,
project_update=project_update)
# Verify that we responded to ssh-agent prompt.
self.assertTrue('Identity added' in project_update.result_stdout,
project_update.result_stdout)
# Try again with a bad unlock password.
project = self.create_project(
name='my git project over ssh with encrypted key and bad pass',
scm_type='git',
scm_url=scm_url,
scm_key_data=TEST_SSH_KEY_DATA_LOCKED,
scm_key_unlock='not the right password',
)
with self.current_user(self.super_django_user):
response = self.get(url, expect=200)
self.assertTrue(response['can_update'])
with self.current_user(self.super_django_user):
response = self.post(url, {}, expect=202)
project_update = project.project_updates.order_by('-pk')[0]
self.check_project_update(project, should_fail=None,
project_update=project_update)
# Verify response to ssh-agent prompt, did not accept password.
self.assertTrue('Bad passphrase' in project_update.result_stdout,
project_update.result_stdout)
self.assertFalse('Identity added' in project_update.result_stdout,
project_update.result_stdout)
def create_local_git_repo(self):
repo_dir = tempfile.mkdtemp()
@ -1344,8 +1321,6 @@ class ProjectUpdatesTest(BaseTransactionTest):
self.check_project_scm(project)
def test_git_project_via_ssh_loopback(self):
self.start_taskmanager(settings.TASK_COMMAND_PORT)
self.start_queue(settings.CALLBACK_CONSUMER_PORT, settings.CALLBACK_QUEUE_PORT)
scm_username = getattr(settings, 'TEST_SSH_LOOPBACK_USERNAME', '')
scm_password = getattr(settings, 'TEST_SSH_LOOPBACK_PASSWORD', '')
if not all([scm_username, scm_password]):
@ -1360,12 +1335,8 @@ class ProjectUpdatesTest(BaseTransactionTest):
scm_password=scm_password,
)
self.check_project_scm(project)
self.terminate_taskmanager()
self.termiante_queue()
def test_public_hg_project_over_https(self):
self.start_taskmanager(settings.TASK_COMMAND_PORT)
self.start_queue(settings.CALLBACK_CONSUMER_PORT, settings.CALLBACK_QUEUE_PORT)
scm_url = getattr(settings, 'TEST_HG_PUBLIC_HTTPS',
'https://bitbucket.org/cchurch/django-hotrunner')
if not all([scm_url]):
@ -1389,12 +1360,8 @@ class ProjectUpdatesTest(BaseTransactionTest):
scm_password=scm_password,
)
self.check_project_update(project2)
self.terminate_taskmanager()
self.terminate_queue()
def test_private_hg_project_over_https(self):
self.start_taskmanager(settings.TASK_COMMAND_PORT)
self.start_queue(settings.CALLBACK_CONSUMER_PORT, settings.CALLBACK_QUEUE_PORT)
scm_url = getattr(settings, 'TEST_HG_PRIVATE_HTTPS', '')
scm_username = getattr(settings, 'TEST_HG_USERNAME', '')
scm_password = getattr(settings, 'TEST_HG_PASSWORD', '')
@ -1408,12 +1375,8 @@ class ProjectUpdatesTest(BaseTransactionTest):
scm_password=scm_password,
)
self.check_project_scm(project)
self.terminate_taskmanager()
self.terminate_queue()
def test_private_hg_project_over_ssh(self):
self.start_taskmanager(settings.TASK_COMMAND_PORT)
self.start_queue(settings.CALLBACK_CONSUMER_PORT, settings.CALLBACK_QUEUE_PORT)
scm_url = getattr(settings, 'TEST_HG_PRIVATE_SSH', '')
scm_key_data = getattr(settings, 'TEST_HG_KEY_DATA', '')
if not all([scm_url, scm_key_data]):
@ -1426,8 +1389,6 @@ class ProjectUpdatesTest(BaseTransactionTest):
)
self.check_project_scm(project)
# hg doesn't support password for ssh:// urls.
self.terminate_taskmanager()
self.terminate_queue()
def create_local_hg_repo(self):
repo_dir = tempfile.mkdtemp()
@ -1472,8 +1433,6 @@ class ProjectUpdatesTest(BaseTransactionTest):
self.check_project_scm(project)
def test_public_svn_project_over_https(self):
self.start_taskmanager(settings.TASK_COMMAND_PORT)
self.start_queue(settings.CALLBACK_CONSUMER_PORT, settings.CALLBACK_QUEUE_PORT)
scm_url = getattr(settings, 'TEST_SVN_PUBLIC_HTTPS',
'https://github.com/ansible/ansible.github.com')
if not all([scm_url]):
@ -1484,12 +1443,8 @@ class ProjectUpdatesTest(BaseTransactionTest):
scm_url=scm_url,
)
self.check_project_scm(project)
self.terminate_taskmanager()
self.terminate_queue()
def test_private_svn_project_over_https(self):
self.start_taskmanager(settings.TASK_COMMAND_PORT)
self.start_queue(settings.CALLBACK_CONSUMER_PORT, settings.CALLBACK_QUEUE_PORT)
scm_url = getattr(settings, 'TEST_SVN_PRIVATE_HTTPS', '')
scm_username = getattr(settings, 'TEST_SVN_USERNAME', '')
scm_password = getattr(settings, 'TEST_SVN_PASSWORD', '')
@ -1503,8 +1458,6 @@ class ProjectUpdatesTest(BaseTransactionTest):
scm_password=scm_password,
)
self.check_project_scm(project)
self.terminate_taskmanager()
self.terminate_queue()
def create_local_svn_repo(self):
repo_dir = tempfile.mkdtemp()
@ -1533,8 +1486,6 @@ class ProjectUpdatesTest(BaseTransactionTest):
self.check_project_scm(project)
def test_svn_project_via_ssh_loopback(self):
self.start_taskmanager(settings.TASK_COMMAND_PORT)
self.start_queue(settings.CALLBACK_CONSUMER_PORT, settings.CALLBACK_QUEUE_PORT)
scm_username = getattr(settings, 'TEST_SSH_LOOPBACK_USERNAME', '')
scm_password = getattr(settings, 'TEST_SSH_LOOPBACK_PASSWORD', '')
if not all([scm_username, scm_password]):
@ -1549,8 +1500,6 @@ class ProjectUpdatesTest(BaseTransactionTest):
scm_password=scm_password,
)
self.check_project_scm(project)
self.terminate_taskmanager()
self.terminate_queue()
def create_test_job_template(self, **kwargs):
opts = {
@ -1626,10 +1575,7 @@ class ProjectUpdatesTest(BaseTransactionTest):
# self.assertTrue(job.status in ('successful', 'failed'))
# self.assertEqual(self.project.project_updates.count(), 3)
@override_settings(IGNORE_CELERY_INSPECTOR=True)
def test_update_on_launch_with_project_passwords(self):
self.start_taskmanager(settings.TASK_COMMAND_PORT)
self.start_queue(settings.CALLBACK_CONSUMER_PORT, settings.CALLBACK_QUEUE_PORT)
scm_url = getattr(settings, 'TEST_GIT_PRIVATE_HTTPS', '')
scm_username = getattr(settings, 'TEST_GIT_USERNAME', '')
scm_password = getattr(settings, 'TEST_GIT_PASSWORD', '')
@ -1682,5 +1628,3 @@ class ProjectUpdatesTest(BaseTransactionTest):
#self.assertEqual(job.status, 'error',
# '\n'.join([job.result_stdout, job.result_traceback]))
self.assertEqual(self.project.project_updates.count(), 4)
self.terminate_taskmanager()
self.terminate_queue()

View File

@ -178,7 +178,6 @@ class RunJobTest(BaseCeleryTest):
self.credential = None
self.cloud_credential = None
settings.INTERNAL_API_URL = self.live_server_url
self.start_taskmanager(settings.TASK_COMMAND_PORT)
if settings.CALLBACK_CONSUMER_PORT:
self.start_queue(settings.CALLBACK_CONSUMER_PORT, settings.CALLBACK_QUEUE_PORT)
@ -186,7 +185,6 @@ class RunJobTest(BaseCeleryTest):
super(RunJobTest, self).tearDown()
if self.test_project_path:
shutil.rmtree(self.test_project_path, True)
self.terminate_taskmanager()
self.terminate_queue()
def create_test_credential(self, **kwargs):