1
0
mirror of https://github.com/ansible/awx.git synced 2024-10-27 09:25:10 +03:00

Shift Django to 1.6.x (>= 1.6.7).

This commit is contained in:
Luke Sneeringer 2014-09-10 12:17:35 -05:00
parent fbd17ede7a
commit 3267a988f8
13 changed files with 142 additions and 113 deletions

View File

@ -157,11 +157,11 @@ pyflakes:
# Run all API unit tests.
test:
$(PYTHON) manage.py test -v2 main
$(PYTHON) manage.py test -v2 awx.main.tests
# Run all API unit tests with coverage enabled.
test_coverage:
coverage run manage.py test -v2 main
coverage run manage.py test -v2 awx.main.tests
# Output test coverage as HTML (into htmlcov directory).
coverage_html:
@ -169,7 +169,7 @@ coverage_html:
# Run UI unit tests using Selenium.
test_ui:
$(PYTHON) manage.py test -v2 ui
$(PYTHON) manage.py test -v2 awx.ui.tests
# Run API unit tests across multiple Python/Django versions with Tox.
test_tox:

View File

@ -9,6 +9,7 @@ from rest_framework import HTTP_HEADER_ENCODING
# AWX
from awx.main.models import Job, AuthToken
class TokenAuthentication(authentication.TokenAuthentication):
'''
Custom token authentication using tokens that expire and are associated

View File

@ -1414,6 +1414,10 @@ class JobTemplateCallback(GenericAPIView):
model = JobTemplate
permission_classes = (JobTemplateCallbackPermission,)
@transaction.non_atomic_requests
def dispatch(self, *args, **kwargs):
return super(JobTemplateCallback, self).dispatch(*args, **kwargs)
def find_matching_hosts(self):
'''
Find the host(s) in the job template's inventory that match the remote
@ -1535,14 +1539,22 @@ class JobTemplateCallback(GenericAPIView):
# FIXME: Log!
return Response(data, status=status.HTTP_400_BAD_REQUEST)
limit = ':&'.join(filter(None, [job_template.limit, host.name]))
job = job_template.create_job(limit=limit, launch_type='callback')
result = job.signal_start(inventory_sources_already_updated=inventory_sources_already_updated)
# Everything is fine; actually create the job.
with transaction.atomic():
job = job_template.create_job(limit=limit, launch_type='callback')
# Send a signal to celery that the job should be started.
isau = inventory_sources_already_updated
result = job.signal_start(inventory_sources_already_updated=isau)
if not result:
data = dict(msg='Error starting job!')
return Response(data, status=status.HTTP_400_BAD_REQUEST)
else:
headers = {'Location': job.get_absolute_url()}
return Response(status=status.HTTP_202_ACCEPTED, headers=headers)
# Return the location of the new job.
headers = {'Location': job.get_absolute_url()}
return Response(status=status.HTTP_202_ACCEPTED, headers=headers)
class JobTemplateJobsList(SubListCreateAPIView):

View File

@ -88,7 +88,7 @@ class Command(BaseCommand):
self.logger.addHandler(handler)
self.logger.propagate = False
@transaction.commit_on_success
@transaction.atomic
def handle(self, *args, **options):
self.verbosity = int(options.get('verbosity', 1))
self.init_logging()

View File

@ -105,7 +105,7 @@ class Command(NoArgsCommand):
self.logger.addHandler(handler)
self.logger.propagate = False
@transaction.commit_on_success
@transaction.atomic
def handle_noargs(self, **options):
self.verbosity = int(options.get('verbosity', 1))
self.init_logging()

View File

@ -1125,7 +1125,6 @@ class Command(NoArgsCommand):
self.logger.error(LICENSE_MESSAGE % d)
raise CommandError('License count exceeded!')
@transaction.commit_on_success
def handle_noargs(self, **options):
self.verbosity = int(options.get('verbosity', 1))
self.init_logging()
@ -1171,10 +1170,11 @@ class Command(NoArgsCommand):
# Update inventory update for this command line invocation.
with ignore_inventory_computed_fields():
if self.inventory_update:
self.inventory_update.status = 'running'
self.inventory_update.save()
transaction.commit()
iu = self.inventory_update
if iu and iu.status != 'running':
with transaction.atomic():
self.inventory_update.status = 'running'
self.inventory_update.save()
# Load inventory from source.
self.all_group = load_inventory_source(self.source, None,
@ -1183,35 +1183,41 @@ class Command(NoArgsCommand):
self.exclude_empty_groups)
self.all_group.debug_tree()
# Merge/overwrite inventory into database.
if settings.SQL_DEBUG:
self.logger.warning('loading into database...')
with ignore_inventory_computed_fields():
if getattr(settings, 'ACTIVITY_STREAM_ENABLED_FOR_INVENTORY_SYNC', True):
self.load_into_database()
else:
with disable_activity_stream():
# Ensure that this is managed as an atomic SQL transaction,
# and thus properly rolled back if there is an issue.
with transaction.atomic():
# Merge/overwrite inventory into database.
if settings.SQL_DEBUG:
self.logger.warning('loading into database...')
with ignore_inventory_computed_fields():
if getattr(settings, 'ACTIVITY_STREAM_ENABLED_FOR_INVENTORY_SYNC', True):
self.load_into_database()
else:
with disable_activity_stream():
self.load_into_database()
if settings.SQL_DEBUG:
queries_before2 = len(connection.queries)
self.inventory.update_computed_fields()
if settings.SQL_DEBUG:
self.logger.warning('update computed fields took %d queries',
len(connection.queries) - queries_before2)
self.check_license()
if self.inventory_source.group:
inv_name = 'group "%s"' % (self.inventory_source.group.name)
else:
inv_name = '"%s" (id=%s)' % (self.inventory.name,
self.inventory.id)
if settings.SQL_DEBUG:
queries_before2 = len(connection.queries)
self.inventory.update_computed_fields()
if settings.SQL_DEBUG:
self.logger.warning('update computed fields took %d queries',
len(connection.queries) - queries_before2)
self.check_license()
if self.inventory_source.group:
inv_name = 'group "%s"' % (self.inventory_source.group.name)
else:
inv_name = '"%s" (id=%s)' % (self.inventory.name,
self.inventory.id)
if settings.SQL_DEBUG:
self.logger.warning('Inventory import completed for %s in %0.1fs',
inv_name, time.time() - begin)
else:
self.logger.info('Inventory import completed for %s in %0.1fs',
inv_name, time.time() - begin)
status = 'successful'
self.logger.warning('Inventory import completed for %s in %0.1fs',
inv_name, time.time() - begin)
else:
self.logger.info('Inventory import completed for %s in %0.1fs',
inv_name, time.time() - begin)
status = 'successful'
# If we're in debug mode, then log the queries and time
# used to do the operation.
if settings.SQL_DEBUG:
queries_this_import = connection.queries[queries_before:]
sqltime = sum(float(x['time']) for x in queries_this_import)
@ -1236,7 +1242,6 @@ class Command(NoArgsCommand):
self.inventory_update.result_traceback = tb
self.inventory_update.status = status
self.inventory_update.save(update_fields=['status', 'result_traceback'])
transaction.commit()
if exc and isinstance(exc, CommandError):
sys.exit(1)

View File

@ -130,16 +130,25 @@ class CallbackReceiver(object):
last_parent_events[message['job_id']] = job_parent_events
consumer_subscriber.send("1")
@transaction.commit_on_success
def process_job_event(self, data):
# Sanity check: Do we need to do anything at all?
event = data.get('event', '')
parent_id = data.get('parent', None)
if not event or 'job_id' not in data:
return
# Get the correct "verbose" value from the job.
# If for any reason there's a problem, just use 0.
try:
verbose = Job.objects.get(id=data['job_id']).verbosity
except Exception, e:
verbose = 0
# Convert the datetime for the job event's creation appropriately,
# and include a time zone for it.
#
# In the event of any issue, throw it out, and Django will just save
# the current time.
try:
if not isinstance(data['created'], datetime.datetime):
data['created'] = parse_datetime(data['created'])
@ -147,31 +156,44 @@ class CallbackReceiver(object):
data['created'] = data['created'].replace(tzinfo=FixedOffset(0))
except (KeyError, ValueError):
data.pop('created', None)
# Print the data to stdout if we're in DEBUG mode.
if settings.DEBUG:
print data
# Sanity check: Don't honor keys that we don't recognize.
for key in data.keys():
if key not in ('job_id', 'event', 'event_data', 'created', 'counter'):
if key not in ('job_id', 'event', 'event_data',
'created', 'counter'):
data.pop(key)
# Save any modifications to the job event to the database.
# If we get a database error of some kind, try again.
for retry_count in xrange(11):
try:
if event == 'playbook_on_stats':
transaction.commit()
if verbose == 0 and 'res' in data['event_data'] and 'invocation' in data['event_data']['res'] and \
'module_args' in data['event_data']['res']['invocation']:
data['event_data']['res']['invocation']['module_args'] = ""
job_event = JobEvent(**data)
if parent_id is not None:
job_event.parent = JobEvent.objects.get(id=parent_id)
job_event.save(post_process=True)
return job_event
with transaction.atomic():
# If we're not in verbose mode, wipe out any module
# arguments.
i = data['event_data'].get('res', {}).get('invocation', {})
if verbose == 0 and 'module_args' in i:
i['module_args'] = ''
# Create a new JobEvent object.
job_event = JobEvent(**data)
if parent_id is not None:
job_event.parent = JobEvent.objects.get(id=parent_id)
job_event.save(post_process=True)
# Retrun the job event object.
return job_event
except DatabaseError as e:
transaction.rollback()
# Log the error and try again.
print('Database error saving job event, retrying in '
'1 second (retry #%d): %s', retry_count + 1, e)
time.sleep(1)
else:
print('Failed to save job event after %d retries.',
retry_count)
# We failed too many times, and are giving up.
print('Failed to save job event after %d retries.', retry_count)
return None
def callback_worker(self, queue_actual):

View File

@ -623,7 +623,7 @@ class UnifiedJob(PolymorphicModel, PasswordFieldsModel, CommonModelNameNotUnique
Notify the task runner system to begin work on this task.
'''
from awx.main.tasks import notify_task_runner
if hasattr(settings, 'CELERY_UNIT_TEST'):
if getattr(settings, 'CELERY_UNIT_TEST', False):
return self.start(None, **kwargs)
if not self.can_start:
return False

View File

@ -154,24 +154,25 @@ def update_inventory_computed_fields(inventory_id, should_update_hosts=True):
i = i[0]
i.update_computed_fields(update_hosts=should_update_hosts)
class BaseTask(Task):
class BaseTask(Task):
name = None
model = None
abstract = True
@transaction.commit_on_success
def update_model(self, pk, **updates):
'''
Reload model from database and update the given fields.
'''
def update_model(self, pk, _attempt=0, **updates):
"""Reload the model instance from the database and update the
given fields.
"""
output_replacements = updates.pop('output_replacements', None) or []
# Commit outstanding transaction so that we fetch the latest object
# from the database.
transaction.commit()
for retry_count in xrange(5):
try:
try:
with transaction.atomic():
# Retrieve the model instance.
instance = self.model.objects.get(pk=pk)
# Update the appropriate fields and save the model
# instance, then return the new instance.
if updates:
update_fields = ['modified']
for field, value in updates.items():
@ -183,17 +184,25 @@ class BaseTask(Task):
if field == 'status':
update_fields.append('failed')
instance.save(update_fields=update_fields)
transaction.commit()
return instance
except DatabaseError as e:
transaction.rollback()
logger.debug('Database error updating %s, retrying in 5 '
'seconds (retry #%d): %s',
self.model._meta.object_name, retry_count + 1, e)
except DatabaseError as e:
# Log out the error to the debug logger.
logger.debug('Database error updating %s, retrying in 5 '
'seconds (retry #%d): %s',
self.model._meta.object_name, retry_count + 1, e)
# Attempt to retry the update, assuming we haven't already
# tried too many times.
if _attempt < 5:
time.sleep(5)
else:
logger.error('Failed to update %s after %d retries.',
self.model._meta.object_name, retry_count)
return self.update_model(pk,
_attempt=_attempt + 1,
output_replacements=output_replacements,
**updates
)
else:
logger.error('Failed to update %s after %d retries.',
self.model._meta.object_name, retry_count)
def signal_finished(self, pk):
pass
@ -375,6 +384,7 @@ class BaseTask(Task):
Run the job/task and capture its output.
'''
instance = self.update_model(pk, status='running', celery_task_id=self.request.id)
instance.socketio_emit_status("running")
status, tb = 'error', ''
output_replacements = []

View File

@ -156,9 +156,7 @@ class BaseCommandMixin(object):
result = None
try:
result = command_runner(name, *args, **options)
except Exception, e:
result = e
except SystemExit, e:
except Exception as e:
result = e
finally:
captured_stdout = sys.stdout.getvalue()
@ -166,10 +164,6 @@ class BaseCommandMixin(object):
sys.stdin = original_stdin
sys.stdout = original_stdout
sys.stderr = original_stderr
# For Django 1.4.x, convert sys.exit(1) and stderr message to the
# CommandError(msg) exception used by Django 1.5 and later.
if isinstance(result, SystemExit) and captured_stderr:
result = CommandError(captured_stderr)
return result, captured_stdout, captured_stderr
class DumpDataTest(BaseCommandMixin, BaseTest):

View File

@ -12,11 +12,12 @@ import urlparse
import uuid
# Django
import django.test
from django.contrib.auth.models import User as DjangoUser
from django.conf import settings
from django.core.urlresolvers import reverse
from django.db import transaction
from django.db.models import Q
import django.test
from django.test.client import Client
from django.test.utils import override_settings
@ -998,20 +999,11 @@ class JobTest(BaseJobTestMixin, django.test.TestCase):
# and that jobs come back nicely serialized with related resources and so on ...
# that we can drill all the way down and can get at host failure lists, etc ...
# Need to disable transaction middleware for testing so that the callback
# management command will be able to read the database changes made to start
# the job. It won't be an issue normally, because the task will be running
# asynchronously; the start API call will update the database, queue the task,
# then return immediately (committing the transaction) before celery has even
# woken up to run the new task.
MIDDLEWARE_CLASSES = filter(lambda x: not x.endswith('TransactionMiddleware'),
settings.MIDDLEWARE_CLASSES)
@override_settings(CELERY_ALWAYS_EAGER=True,
CELERY_EAGER_PROPAGATES_EXCEPTIONS=True,
CALLBACK_CONSUMER_PORT='',
ANSIBLE_TRANSPORT='local',
MIDDLEWARE_CLASSES=MIDDLEWARE_CLASSES)
ANSIBLE_TRANSPORT='local')
class JobStartCancelTest(BaseJobTestMixin, django.test.LiveServerTestCase):
'''Job API tests that need to use the celery task backend.'''
@ -1274,8 +1266,7 @@ class JobStartCancelTest(BaseJobTestMixin, django.test.LiveServerTestCase):
@override_settings(CELERY_ALWAYS_EAGER=True,
CELERY_EAGER_PROPAGATES_EXCEPTIONS=True,
ANSIBLE_TRANSPORT='local',
MIDDLEWARE_CLASSES=MIDDLEWARE_CLASSES)
ANSIBLE_TRANSPORT='local')
class JobTemplateCallbackTest(BaseJobTestMixin, django.test.LiveServerTestCase):
'''Job template callback tests for empheral hosts.'''
@ -1416,7 +1407,12 @@ class JobTemplateCallbackTest(BaseJobTestMixin, django.test.LiveServerTestCase):
host_ip = self.get_test_ips_for_host(host.name)[0]
jobs_qs = job_template.jobs.filter(launch_type='callback').order_by('-pk')
self.assertEqual(jobs_qs.count(), 0)
# Create the job itself.
result = self.post(url, data, expect=202, remote_addr=host_ip)
# Establish that we got back what we expect, and made the changes
# that we expect.
self.assertTrue('Location' in result.response, result.response)
self.assertEqual(jobs_qs.count(), 1)
job = jobs_qs[0]
@ -1613,8 +1609,7 @@ class JobTemplateCallbackTest(BaseJobTestMixin, django.test.LiveServerTestCase):
@override_settings(CELERY_ALWAYS_EAGER=True,
CELERY_EAGER_PROPAGATES_EXCEPTIONS=True,
ANSIBLE_TRANSPORT='local')#,
#MIDDLEWARE_CLASSES=MIDDLEWARE_CLASSES)
ANSIBLE_TRANSPORT='local')
class JobTransactionTest(BaseJobTestMixin, django.test.LiveServerTestCase):
'''Job test of transaction locking using the celery task backend.'''

View File

@ -498,8 +498,6 @@ class RunJobTest(BaseCeleryTest):
host_pks)
if async:
qs = job_events.filter(event='runner_on_async_poll')
if not async_nowait:
self.assertTrue(qs.count())
for evt in qs:
self.assertEqual(evt.host, self.host)
self.assertTrue(evt.play, evt)

View File

@ -32,6 +32,7 @@ DATABASES = {
'NAME': os.path.join(BASE_DIR, 'awx.sqlite3'),
# Test database cannot be :memory: for celery/inventory tests to work.
'TEST_NAME': os.path.join(BASE_DIR, 'awx_test.sqlite3'),
'ATOMIC_REQUESTS': True,
}
}
@ -114,8 +115,6 @@ TEMPLATE_CONTEXT_PROCESSORS += (
)
MIDDLEWARE_CLASSES += (
'django.middleware.transaction.TransactionMiddleware',
# Middleware loaded after this point will be subject to transactions.
'awx.main.middleware.ActivityStreamMiddleware',
'crum.CurrentRequestUserMiddleware',
)
@ -238,13 +237,6 @@ EMAIL_USE_TLS = False
try:
import debug_toolbar
INSTALLED_APPS += ('debug_toolbar',)
# Add debug toolbar middleware before Transaction middleware.
new_mc = []
for mc in MIDDLEWARE_CLASSES:
if mc == 'django.middleware.transaction.TransactionMiddleware':
new_mc.append('debug_toolbar.middleware.DebugToolbarMiddleware')
new_mc.append(mc)
MIDDLEWARE_CLASSES = tuple(new_mc)
except ImportError:
pass