1
0
mirror of https://github.com/ansible/awx.git synced 2024-11-02 01:21:21 +03:00

AC-1040 Unified jobs updates to get unit tests to pass (hopefully).

This commit is contained in:
Chris Church 2014-03-20 19:16:58 -04:00
parent b970bf54fa
commit 54c533d410
16 changed files with 519 additions and 166 deletions

View File

@ -339,7 +339,8 @@ class ProjectSerializer(BaseSerializer):
'scm_branch', 'scm_clean',
'scm_delete_on_update', 'scm_delete_on_next_update',
'scm_update_on_launch', 'credential',
'last_update_failed', 'status', 'last_updated')
#'last_update_failed', 'status', 'last_updated')
'last_job_failed', 'status', 'last_job_run')
def get_related(self, obj):
if obj is None:
@ -356,12 +357,17 @@ class ProjectSerializer(BaseSerializer):
if obj.credential and obj.credential.active:
res['credential'] = reverse('api:credential_detail',
args=(obj.credential.pk,))
if obj.current_update:
res['current_update'] = reverse('api:project_update_detail',
args=(obj.current_update.pk,))
if obj.last_update:
res['last_update'] = reverse('api:project_update_detail',
args=(obj.last_update.pk,))
#if obj.current_update:
# res['current_update'] = reverse('api:project_update_detail',
#if obj.last_update:
# res['last_update'] = reverse('api:project_update_detail',
# args=(obj.last_update.pk,))
if obj.current_job:
res['current_job'] = reverse('api:project_update_detail',
args=(obj.current_job.pk,))
if obj.last_job:
res['last_job'] = reverse('api:project_update_detail',
args=(obj.last_job.pk,))
return res
def validate_local_path(self, attrs, source):
@ -694,8 +700,9 @@ class InventorySourceSerializer(BaseSerializer):
fields = ('id', 'type', 'url', 'related', 'summary_fields', 'created',
'modified', 'inventory', 'group', 'source', 'source_path',
'source_vars', 'credential', 'source_regions', 'overwrite',
'overwrite_vars', 'update_on_launch', 'update_interval',
'last_update_failed', 'status', 'last_updated')
'overwrite_vars', 'update_on_launch', #'update_interval',
#'last_update_failed', 'status', 'last_updated')
'last_job_failed', 'status', 'last_job_run')
read_only_fields = ('inventory', 'group')
def get_related(self, obj):
@ -716,12 +723,18 @@ class InventorySourceSerializer(BaseSerializer):
if obj.credential and obj.credential.active:
res['credential'] = reverse('api:credential_detail',
args=(obj.credential.pk,))
if obj.current_update:
res['current_update'] = reverse('api:inventory_update_detail',
args=(obj.current_update.pk,))
if obj.last_update:
res['last_update'] = reverse('api:inventory_update_detail',
args=(obj.last_update.pk,))
#if obj.current_update:
# res['current_update'] = reverse('api:inventory_update_detail',
# args=(obj.current_update.pk,))
#if obj.last_update:
# res['last_update'] = reverse('api:inventory_update_detail',
# args=(obj.last_update.pk,))
if obj.current_job:
res['current_job'] = reverse('api:inventory_update_detail',
args=(obj.current_job.pk,))
if obj.last_job:
res['last_job'] = reverse('api:inventory_update_detail',
args=(obj.last_job.pk,))
return res
def get_summary_fields(self, obj):

View File

@ -389,7 +389,7 @@ class ProjectList(ListCreateAPIView):
projects_qs = Project.objects.filter(active=True)
projects_qs = projects_qs.select_related('current_update', 'last_updated')
for project in projects_qs:
project.set_status_and_last_updated()
project._set_status_and_last_job_run()
return super(ProjectList, self).get(request, *args, **kwargs)
class ProjectDetail(RetrieveUpdateDestroyAPIView):

View File

@ -396,7 +396,7 @@ class GroupAccess(BaseAccess):
def get_queryset(self):
qs = self.model.objects.filter(active=True).distinct()
qs = qs.select_related('created_by', 'inventory', 'inventory_source')
qs = qs.select_related('created_by', 'inventory')#, 'inventory_source')
qs = qs.prefetch_related('parents', 'children')
inventories_qs = self.user.get_queryset(Inventory)
return qs.filter(inventory__in=inventories_qs)

View File

@ -23,6 +23,7 @@ class AutoSingleRelatedObjectDescriptor(SingleRelatedObjectDescriptor):
except self.related.model.DoesNotExist:
obj = self.related.model(**{self.related.field.name: instance})
if self.related.field.rel.parent_link:
raise NotImplementedError('not supported with polymorphic!')
for f in instance._meta.local_fields:
setattr(obj, f.name, getattr(instance, f.name))
obj.save()

View File

@ -53,15 +53,36 @@ class Migration(DataMigration):
new_ctype = orm['contenttypes.ContentType'].objects.get(app_label=orm.Project._meta.app_label, model=orm.Project._meta.module_name)
for project in orm.Project.objects.order_by('pk'):
d = self._get_dict_from_common_model(project)
d['polymorphic_ctype_id'] = new_ctype.pk
d.update({
'polymorphic_ctype_id': new_ctype.pk,
'local_path': project.local_path,
'scm_type': project.scm_type,
'scm_url': project.scm_url,
'scm_branch': project.scm_branch,
'scm_clean': project.scm_clean,
'scm_delete_on_update': project.scm_delete_on_update,
'credential_id': project.credential_id,
'scm_delete_on_next_update': project.scm_delete_on_next_update,
'scm_update_on_launch': project.scm_update_on_launch,
})
new_project, created = orm.ProjectNew.objects.get_or_create(old_pk=project.pk, defaults=d)
# Copy ProjectUpdate old to new.
new_ctype = orm['contenttypes.ContentType'].objects.get(app_label=orm.ProjectUpdate._meta.app_label, model=orm.ProjectUpdate._meta.module_name)
for project_update in orm.ProjectUpdate.objects.order_by('pk'):
project = project_update.project
d = self._get_dict_from_common_task_model(project_update)
d['project_id'] = orm.ProjectNew.objects.get(old_pk=project_update.project_id).pk
d['polymorphic_ctype_id'] = new_ctype.pk
d.update({
'polymorphic_ctype_id': new_ctype.pk,
'project_id': orm.ProjectNew.objects.get(old_pk=project_update.project_id).pk,
'local_path': project.local_path,
'scm_type': project.scm_type,
'scm_url': project.scm_url,
'scm_branch': project.scm_branch,
'scm_clean': project.scm_clean,
'scm_delete_on_update': project.scm_delete_on_update,
'credential_id': project.credential_id,
})
new_project_update, created = orm.ProjectUpdateNew.objects.get_or_create(old_pk=project_update.pk, defaults=d)
# Update Project last run.
@ -71,6 +92,9 @@ class Migration(DataMigration):
new_project.current_job = orm.ProjectUpdateNew.objects.get(old_pk=project.current_update_id)
if project.last_update:
new_project.last_job = orm.ProjectUpdateNew.objects.get(old_pk=project.last_update_id)
new_project.last_job_failed = project.last_update_failed
new_project.last_job_run = project.last_updated
new_project.status = project.status
new_project.save()
# Update Organization projects.
@ -97,15 +121,38 @@ class Migration(DataMigration):
new_ctype = orm['contenttypes.ContentType'].objects.get(app_label=orm.InventorySource._meta.app_label, model=orm.InventorySource._meta.module_name)
for inventory_source in orm.InventorySource.objects.order_by('pk'):
d = self._get_dict_from_common_model(inventory_source)
d['polymorphic_ctype_id'] = new_ctype.pk
d.update({
'polymorphic_ctype_id': new_ctype.pk,
'source': inventory_source.source,
'source_path': inventory_source.source_path,
'source_vars': inventory_source.source_vars,
'credential_id': inventory_source.credential_id,
'source_regions': inventory_source.source_regions,
'overwrite': inventory_source.overwrite,
'overwrite_vars': inventory_source.overwrite_vars,
'update_on_launch': inventory_source.update_on_launch,
'inventory_id': inventory_source.inventory_id,
'group_id': inventory_source.group_id,
})
new_inventory_source, created = orm.InventorySourceNew.objects.get_or_create(old_pk=inventory_source.pk, defaults=d)
# Copy InventoryUpdate old to new.
new_ctype = orm['contenttypes.ContentType'].objects.get(app_label=orm.InventoryUpdate._meta.app_label, model=orm.InventoryUpdate._meta.module_name)
for inventory_update in orm.InventoryUpdate.objects.order_by('pk'):
inventory_source = inventory_update.inventory_source
d = self._get_dict_from_common_task_model(inventory_update)
d['inventory_source_id'] = orm.InventorySourceNew.objects.get(old_pk=inventory_update.inventory_source_id).pk
d['polymorphic_ctype_id'] = new_ctype.pk
d.update({
'polymorphic_ctype_id': new_ctype.pk,
'source': inventory_source.source,
'source_path': inventory_source.source_path,
'source_vars': inventory_source.source_vars,
'credential_id': inventory_source.credential_id,
'source_regions': inventory_source.source_regions,
'overwrite': inventory_source.overwrite,
'overwrite_vars': inventory_source.overwrite_vars,
'inventory_source_id': orm.InventorySourceNew.objects.get(old_pk=inventory_update.inventory_source_id).pk,
'license_error': inventory_update.license_error,
})
new_inventory_update, created = orm.InventoryUpdateNew.objects.get_or_create(old_pk=inventory_update.pk, defaults=d)
# Update InventorySource last run.
@ -115,6 +162,9 @@ class Migration(DataMigration):
new_inventory_source.current_job = orm.InventoryUpdateNew.objects.get(old_pk=inventory_source.current_update_id)
if inventory_source.last_update:
new_inventory_source.last_job = orm.InventoryUpdateNew.objects.get(old_pk=inventory_source.last_update_id)
new_inventory_source.last_job_failed = inventory_source.last_update_failed
new_inventory_source.last_job_run = inventory_source.last_updated
new_inventory_source.status = inventory_source.status
new_inventory_source.save()
# Update Group inventory_sources.
@ -133,20 +183,57 @@ class Migration(DataMigration):
new_ctype = orm['contenttypes.ContentType'].objects.get(app_label=orm.JobTemplate._meta.app_label, model=orm.JobTemplate._meta.module_name)
for job_template in orm.JobTemplate.objects.order_by('pk'):
d = self._get_dict_from_common_model(job_template)
d.update({
'polymorphic_ctype_id': new_ctype.pk,
'job_type': job_template.job_type,
'inventory_id': job_template.inventory_id,
'playbook': job_template.playbook,
'credential_id': job_template.credential_id,
'cloud_credential_id': job_template.cloud_credential_id,
'forks': job_template.forks,
'limit': job_template.limit,
'extra_vars': job_template.extra_vars,
'job_tags': job_template.job_tags,
'host_config_key': job_template.host_config_key,
})
if job_template.project:
d['project_id'] = orm.ProjectNew.objects.get(old_pk=job_template.project_id).pk
d['polymorphic_ctype_id'] = new_ctype.pk
new_job_template, created = orm.JobTemplateNew.objects.get_or_create(old_pk=job_template.pk, defaults=d)
# Copy Job old to new.
new_ctype = orm['contenttypes.ContentType'].objects.get(app_label=orm.Job._meta.app_label, model=orm.Job._meta.module_name)
for job in orm.Job.objects.order_by('pk'):
d = self._get_dict_from_common_task_model(job)
d.update({
'polymorphic_ctype_id': new_ctype.pk,
'job_type': job_template.job_type,
'inventory_id': job_template.inventory_id,
'playbook': job_template.playbook,
'credential_id': job_template.credential_id,
'cloud_credential_id': job_template.cloud_credential_id,
'forks': job_template.forks,
'limit': job_template.limit,
'extra_vars': job_template.extra_vars,
'job_tags': job_template.job_tags,
})
if job.project:
d['project_id'] = orm.ProjectNew.objects.get(old_pk=job.project_id).pk
d['polymorphic_ctype_id'] = new_ctype.pk
if job.job_template:
d['job_template_id'] = orm.JobTemplateNew.objects.get(old_pk=job.job_template_id).pk
new_job, created = orm.JobNew.objects.get_or_create(old_pk=job.pk, defaults=d)
# Update JobTemplate last run.
for new_job_template in orm.JobTemplateNew.objects.order_by('pk'):
try:
new_last_job = new_job_template.jobs.order_by('-pk')[0]
new_job_template.last_job = new_last_job
new_job_template.last_job_failed = new_last_job.failed
new_job_template.last_job_run = new_last_job.finished
new_job_template.status = 'failed' if new_last_job.failed else 'successful'
except IndexError:
new_job_template.status = 'never updated'
new_inventory_source.save()
# Update JobHostSummary job.
for job_host_summary in orm.JobHostSummary.objects.order_by('pk'):
new_job = orm.JobNew.objects.get(old_pk=job_host_summary.job_id)
@ -192,6 +279,7 @@ class Migration(DataMigration):
"Write your backwards methods here."
# FIXME: Would like to have this, but not required.
raise NotImplementedError()
models = {
u'auth.group': {

View File

@ -7,7 +7,7 @@ from django.db import models
class Migration(SchemaMigration):
'''
Rename tables to be consistent with model names.
Rename tables/columns to be consistent with model/field names.
'''
def forwards(self, orm):
@ -19,41 +19,60 @@ class Migration(SchemaMigration):
db.rename_table(u'main_jobnew', 'main_job')
db.rename_table(db.shorten_name(u'main_team_new_projects'), db.shorten_name(u'main_team_projects'))
db.rename_column(db.shorten_name(u'main_team_projects'), 'projectnew_id', 'project_id')
db.rename_table(db.shorten_name(u'main_organization_new_projects'), db.shorten_name(u'main_organization_projects'))
db.rename_column(db.shorten_name(u'main_organization_projects'), 'projectnew_id', 'project_id')
db.rename_column(u'main_permission', 'new_project_id', 'project_id')
db.rename_column(u'main_host', 'new_last_job_id', 'last_job_id')
db.rename_table(db.shorten_name(u'main_host_new_inventory_sources'), db.shorten_name(u'main_host_inventory_sources'))
db.rename_column(db.shorten_name(u'main_host_inventory_sources'), 'inventorysourcenew_id', 'inventorysource_id')
db.rename_table(db.shorten_name(u'main_group_new_inventory_sources'), db.shorten_name(u'main_group_inventory_sources'))
db.rename_column(db.shorten_name(u'main_group_inventory_sources'), 'inventorysourcenew_id', 'inventorysource_id')
db.rename_column(u'main_jobhostsummary', 'new_job_id', 'job_id')
db.rename_column(u'main_jobevent', 'new_job_id', 'job_id')
db.rename_table(db.shorten_name(u'main_activitystream_new_project'), db.shorten_name(u'main_activitystream_project'))
db.rename_column(db.shorten_name(u'main_activitystream_project'), 'projectnew_id', 'project_id')
db.rename_table(db.shorten_name(u'main_activitystream_new_project_update'), db.shorten_name(u'main_activitystream_project_update'))
db.rename_column(db.shorten_name(u'main_activitystream_project_update'), 'projectupdatenew_id', 'projectupdate_id')
db.rename_table(db.shorten_name(u'main_activitystream_new_inventory_source'), db.shorten_name(u'main_activitystream_inventory_source'))
db.rename_column(db.shorten_name(u'main_activitystream_inventory_source'), 'inventorysourcenew_id', 'inventorysource_id')
db.rename_table(db.shorten_name(u'main_activitystream_new_inventory_update'), db.shorten_name(u'main_activitystream_inventory_update'))
db.rename_column(db.shorten_name(u'main_activitystream_inventory_update'), 'inventoryupdatenew_id', 'inventoryupdate_id')
db.rename_table(db.shorten_name(u'main_activitystream_new_job_template'), db.shorten_name(u'main_activitystream_job_template'))
db.rename_column(db.shorten_name(u'main_activitystream_job_template'), 'jobtemplatenew_id', 'jobtemplate_id')
db.rename_table(db.shorten_name(u'main_activitystream_new_job'), db.shorten_name(u'main_activitystream_job'))
db.rename_column(db.shorten_name(u'main_activitystream_job'), 'jobnew_id', 'job_id')
def backwards(self, orm):
db.rename_column(db.shorten_name(u'main_activitystream_job'), 'job_id', 'jobnew_id')
db.rename_table(db.shorten_name(u'main_activitystream_job'), db.shorten_name(u'main_activitystream_new_job'))
db.rename_column(db.shorten_name(u'main_activitystream_job_template'), 'jobtemplate_id', 'jobtemplatenew_id')
db.rename_table(db.shorten_name(u'main_activitystream_job_template'), db.shorten_name(u'main_activitystream_new_job_template'))
db.rename_column(db.shorten_name(u'main_activitystream_inventory_update'), 'inventoryupdate_id', 'inventoryupdatenew_id')
db.rename_table(db.shorten_name(u'main_activitystream_inventory_update'), db.shorten_name(u'main_activitystream_new_inventory_update'))
db.rename_column(db.shorten_name(u'main_activitystream_inventory_source'), 'inventorysource_id', 'inventorysourcenew_id')
db.rename_table(db.shorten_name(u'main_activitystream_inventory_source'), db.shorten_name(u'main_activitystream_new_inventory_source'))
db.rename_column(db.shorten_name(u'main_activitystream_project_update'), 'projectupdate_id', 'projectupdatenew_id')
db.rename_table(db.shorten_name(u'main_activitystream_project_update'), db.shorten_name(u'main_activitystream_new_project_update'))
db.rename_column(db.shorten_name(u'main_activitystream_project'), 'project_id', 'projectnew_id')
db.rename_table(db.shorten_name(u'main_activitystream_project'), db.shorten_name(u'main_activitystream_new_project'))
db.rename_column(u'main_jobevent', 'job_id', 'new_job_id')
db.rename_column(u'main_jobhostsummary', 'job_id', 'new_job_id')
db.rename_column(db.shorten_name(u'main_group_inventory_sources'), 'inventorysource_id', 'inventorysourcenew_id')
db.rename_table(db.shorten_name(u'main_group_inventory_sources'), db.shorten_name(u'main_group_new_inventory_sources'))
db.rename_column(db.shorten_name(u'main_host_inventory_sources'), 'inventorysource_id', 'inventorysourcenew_id')
db.rename_table(db.shorten_name(u'main_host_inventory_sources'), db.shorten_name(u'main_host_new_inventory_sources'))
db.rename_column(u'main_host', 'last_job_id', 'new_last_job_id')
db.rename_column(u'main_permission', 'project_id', 'new_project_id')
db.rename_column(db.shorten_name(u'main_organization_projects'), 'project_id', 'projectnew_id')
db.rename_table(db.shorten_name(u'main_organization_projects'), db.shorten_name(u'main_organization_new_projects'))
db.rename_column(db.shorten_name(u'main_team_projects'), 'project_id', 'projectnew_id')
db.rename_table(db.shorten_name(u'main_team_projects'), db.shorten_name(u'main_team_new_projects'))
db.rename_table(u'main_job', 'main_jobnew')

View File

@ -58,12 +58,12 @@ class ActivityStreamBase(models.Model):
# For compatibility with Django 1.4.x, attempt to handle any calls to
# save that pass update_fields.
try:
super(ActivityStream, self).save(*args, **kwargs)
super(ActivityStreamBase, self).save(*args, **kwargs)
except TypeError:
if 'update_fields' not in kwargs:
raise
kwargs.pop('update_fields')
super(ActivityStream, self).save(*args, **kwargs)
super(ActivityStreamBase, self).save(*args, **kwargs)
if getattr(settings, 'UNIFIED_JOBS_STEP') == 0:

View File

@ -129,7 +129,8 @@ class Inventory(CommonModel):
active_groups = self.groups.filter(active=True)
failed_groups = active_groups.filter(has_active_failures=True)
active_inventory_sources = self.inventory_sources.filter(active=True, source__in=CLOUD_INVENTORY_SOURCES)
failed_inventory_sources = active_inventory_sources.filter(last_update_failed=True)
#failed_inventory_sources = active_inventory_sources.filter(last_update_failed=True)
failed_inventory_sources = active_inventory_sources.filter(last_job_failed=True)
computed_fields = {
'has_active_failures': bool(failed_hosts.count()),
'total_hosts': active_hosts.count(),
@ -229,7 +230,7 @@ class HostBase(CommonModelNameNotUnique):
When marking hosts inactive, remove all associations to related
inventory sources.
'''
super(Host, self).mark_inactive(save=save)
super(HostBase, self).mark_inactive(save=save)
self.inventory_sources.clear()
def update_computed_fields(self, update_inventory=True, update_groups=True):
@ -453,7 +454,7 @@ class GroupBase(CommonModelNameNotUnique):
groups/hosts/inventory_sources.
'''
def mark_actual():
super(Group, self).mark_inactive(save=save)
super(GroupBase, self).mark_inactive(save=save)
self.inventory_source.mark_inactive(save=save)
self.inventory_sources.clear()
self.parents.clear()
@ -670,20 +671,6 @@ class InventorySourceOptions(BaseModel):
help_text=_('Overwrite local variables from remote inventory source.'),
)
class InventorySourceBase(InventorySourceOptions):
class Meta:
abstract = True
app_label = 'main'
update_on_launch = models.BooleanField(
default=False,
)
update_cache_timeout = models.PositiveIntegerField(
default=0,
)
@classmethod
def get_ec2_region_choices(cls):
ec2_region_names = getattr(settings, 'EC2_REGION_NAMES', {})
@ -754,57 +741,46 @@ class InventorySourceBase(InventorySourceOptions):
', '.join(invalid_regions)))
return ','.join(regions)
class InventorySourceBase(InventorySourceOptions):
class Meta:
abstract = True
app_label = 'main'
update_on_launch = models.BooleanField(
default=False,
)
update_cache_timeout = models.PositiveIntegerField(
default=0,
)
class InventorySourceBaseMethods(object):
def save(self, *args, **kwargs):
new_instance = not bool(self.pk)
# If update_fields has been specified, add our field names to it,
# if it hasn't been specified, then we're just doing a normal save.
update_fields = kwargs.get('update_fields', [])
# Update status and last_updated fields.
updated_fields = self.set_status_and_last_updated(save=False)
for field in updated_fields:
if field not in update_fields:
update_fields.append(field)
# Update inventory from group (if available).
if self.group and not self.inventory:
self.inventory = self.group.inventory
if 'inventory' not in update_fields:
update_fields.append('inventory')
# Set name automatically.
if not self.name:
self.name = 'inventory_source %s' % now()
if 'name' not in update_fields:
update_fields.append('name')
# Do the actual save.
super(InventorySource, self).save(*args, **kwargs)
super(InventorySourceBaseMethods, self).save(*args, **kwargs)
source_vars_dict = VarsDictProperty('source_vars')
def set_status_and_last_updated(self, save=True):
# Determine current status.
if self.source:
if self.current_update:
status = 'updating'
elif not self.last_update:
status = 'never updated'
elif self.last_update_failed:
status = 'failed'
else:
status = 'successful'
else:
status = 'none'
# Determine current last_updated timestamp.
last_updated = None
if self.source and self.last_update:
last_updated = self.last_update.modified
# Update values if changed.
update_fields = []
if self.status != status:
self.status = status
update_fields.append('status')
if self.last_updated != last_updated:
self.last_updated = last_updated
update_fields.append('last_updated')
if save and update_fields:
self.save(update_fields=update_fields)
return update_fields
def get_absolute_url(self):
return reverse('api:inventory_source_detail', args=(self.pk,))
@property
def can_update(self):
def _can_update(self):
# FIXME: Prevent update when another one is active!
return bool(self.source)
@ -820,13 +796,10 @@ class InventorySourceBase(InventorySourceOptions):
inventory_update.start()
return inventory_update
def get_absolute_url(self):
return reverse('api:inventory_source_detail', args=(self.pk,))
if getattr(settings, 'UNIFIED_JOBS_STEP') == 0:
class InventorySource(PrimordialModel, InventorySourceBase):
class InventorySource(InventorySourceBaseMethods, PrimordialModel, InventorySourceBase):
INVENTORY_SOURCE_STATUS_CHOICES = [
('none', _('No External Source')),
@ -886,7 +859,7 @@ if getattr(settings, 'UNIFIED_JOBS_STEP') == 0:
if getattr(settings, 'UNIFIED_JOBS_STEP') in (0, 1):
class InventorySourceNew(UnifiedJobTemplate, InventorySourceBase):
class InventorySourceNew(InventorySourceBaseMethods, UnifiedJobTemplate, InventorySourceBase):
class Meta:
app_label = 'main'
@ -916,7 +889,7 @@ if getattr(settings, 'UNIFIED_JOBS_STEP') == 1:
if getattr(settings, 'UNIFIED_JOBS_STEP') == 2:
class InventorySource(UnifiedJobTemplate, InventorySourceBase):
class InventorySource(InventorySourceBaseMethods, UnifiedJobTemplate, InventorySourceBase):
class Meta:
app_label = 'main'
@ -952,6 +925,9 @@ class InventoryUpdateBase(InventorySourceOptions):
editable=False,
)
class InventoryUpdateBaseMethods(object):
def save(self, *args, **kwargs):
update_fields = kwargs.get('update_fields', [])
if bool('license' in self.result_stdout and
@ -959,7 +935,7 @@ class InventoryUpdateBase(InventorySourceOptions):
self.license_error = True
if 'license_error' not in update_fields:
update_fields.append('license_error')
super(InventoryUpdate, self).save(*args, **kwargs)
super(InventoryUpdateBaseMethods, self).save(*args, **kwargs)
def _get_parent_instance(self):
return self.inventory_source
@ -974,7 +950,7 @@ class InventoryUpdateBase(InventorySourceOptions):
if getattr(settings, 'UNIFIED_JOBS_STEP') == 0:
class InventoryUpdate(CommonTask, InventoryUpdateBase):
class InventoryUpdate(InventoryUpdateBaseMethods, CommonTask, InventoryUpdateBase):
class Meta:
app_label = 'main'
@ -988,7 +964,7 @@ if getattr(settings, 'UNIFIED_JOBS_STEP') == 0:
if getattr(settings, 'UNIFIED_JOBS_STEP') in (0, 1):
class InventoryUpdateNew(UnifiedJob, InventoryUpdateBase):
class InventoryUpdateNew(InventoryUpdateBaseMethods, UnifiedJob, InventoryUpdateBase):
class Meta:
app_label = 'main'
@ -1009,7 +985,7 @@ if getattr(settings, 'UNIFIED_JOBS_STEP') == 1:
if getattr(settings, 'UNIFIED_JOBS_STEP') == 2:
class InventoryUpdate(UnifiedJob, InventoryUpdateBase):
class InventoryUpdate(InventoryUpdateBaseMethods, UnifiedJob, InventoryUpdateBase):
class Meta:
app_label = 'main'

View File

@ -29,6 +29,9 @@ from django.utils.timezone import now, make_aware, get_default_timezone
# Django-JSONField
from jsonfield import JSONField
# Django-Polymorphic
from polymorphic import PolymorphicModel
# AWX
from awx.main.models.base import *
from awx.main.models.unified_jobs import *
@ -138,6 +141,7 @@ class JobTemplateBase(JobOptions):
default='',
)
class JobTemplateBaseMethods(object):
def create_job(self, **kwargs):
'''
@ -181,7 +185,7 @@ class JobTemplateBase(JobOptions):
if getattr(settings, 'UNIFIED_JOBS_STEP') == 0:
class JobTemplate(CommonModel, JobTemplateBase):
class JobTemplate(JobTemplateBaseMethods, CommonModel, JobTemplateBase):
class Meta:
app_label = 'main'
@ -195,7 +199,7 @@ if getattr(settings, 'UNIFIED_JOBS_STEP') == 0:
if getattr(settings, 'UNIFIED_JOBS_STEP') in (0, 1):
class JobTemplateNew(UnifiedJobTemplate, JobTemplateBase):
class JobTemplateNew(JobTemplateBaseMethods, UnifiedJobTemplate, JobTemplateBase):
class Meta:
app_label = 'main'
@ -217,7 +221,8 @@ if getattr(settings, 'UNIFIED_JOBS_STEP') == 1:
if getattr(settings, 'UNIFIED_JOBS_STEP') == 2:
class JobTemplate(UnifiedJobTemplate, JobTemplateBase):
#class JobTemplate(JobTemplateBase, UnifiedJobTemplate):
class JobTemplate(JobTemplateBaseMethods, UnifiedJobTemplate, JobTemplateBase):
class Meta:
app_label = 'main'
@ -249,6 +254,8 @@ class JobBase(JobOptions):
through='JobHostSummary',
)
class JobBaseMethods(object):
def get_absolute_url(self):
return reverse('api:job_detail', args=(self.pk,))
@ -374,7 +381,7 @@ class JobBase(JobOptions):
if getattr(settings, 'UNIFIED_JOBS_STEP') == 0:
class Job(CommonTask, JobBase):
class Job(JobBaseMethods, CommonTask, JobBase):
LAUNCH_TYPE_CHOICES = [
('manual', _('Manual')),
@ -408,7 +415,7 @@ if getattr(settings, 'UNIFIED_JOBS_STEP') == 0:
if getattr(settings, 'UNIFIED_JOBS_STEP') in (0, 1):
class JobNew(UnifiedJob, JobBase):
class JobNew(JobBaseMethods, UnifiedJob, JobBase):
class Meta:
app_label = 'main'
@ -437,7 +444,8 @@ if getattr(settings, 'UNIFIED_JOBS_STEP') == 1:
if getattr(settings, 'UNIFIED_JOBS_STEP') == 2:
class Job(UnifiedJob, JobBase):
#class Job(JobBase, UnifiedJob):
class Job(JobBaseMethods, UnifiedJob, JobBase):
class Meta:
app_label = 'main'
@ -504,7 +512,7 @@ class JobHostSummaryBase(CreatedModifiedModel):
update_fields = kwargs.get('update_fields', [])
self.failed = bool(self.dark or self.failures)
update_fields.append('failed')
super(JobHostSummary, self).save(*args, **kwargs)
super(JobHostSummaryBase, self).save(*args, **kwargs)
self.update_host_last_job_summary()
def update_host_last_job_summary(self):
@ -856,7 +864,7 @@ class JobEventBase(CreatedModifiedModel):
self.parent = self._find_parent()
if 'parent' not in update_fields:
update_fields.append('parent')
super(JobEvent, self).save(*args, **kwargs)
super(JobEventBase, self).save(*args, **kwargs)
if post_process and not from_parent_update:
self.update_parent_failed_and_changed()
# FIXME: The update_hosts() call (and its queries) are the current

View File

@ -171,6 +171,9 @@ class ProjectBase(ProjectOptions):
default=0,
)
class ProjectBaseMethods(object):
def save(self, *args, **kwargs):
new_instance = not bool(self.pk)
# If update_fields has been specified, add our field names to it,
@ -178,7 +181,7 @@ class ProjectBase(ProjectOptions):
update_fields = kwargs.get('update_fields', [])
# Check if scm_type or scm_url changes.
if self.pk:
project_before = Project.objects.get(pk=self.pk)
project_before = self.__class__.objects.get(pk=self.pk)
if project_before.scm_type != self.scm_type or project_before.scm_url != self.scm_url:
self.scm_delete_on_next_update = True
if 'scm_delete_on_next_update' not in update_fields:
@ -189,13 +192,8 @@ class ProjectBase(ProjectOptions):
self.local_path = u'_%d__%s' % (self.pk, slug_name)
if 'local_path' not in update_fields:
update_fields.append('local_path')
# Update status and last_updated fields.
updated_fields = self.set_status_and_last_updated(save=False)
for field in updated_fields:
if field not in update_fields:
update_fields.append(field)
# Do the actual save.
super(Project, self).save(*args, **kwargs)
super(ProjectBaseMethods, self).save(*args, **kwargs)
if new_instance:
update_fields=[]
# Generate local_path for SCM after initial save (so we have a PK).
@ -207,50 +205,37 @@ class ProjectBase(ProjectOptions):
if new_instance and self.scm_type:
self.update()
def set_status_and_last_updated(self, save=True):
# Determine current status.
def _get_current_status(self):
if self.scm_type:
if self.current_update:
status = 'updating'
elif not self.last_update:
status = 'never updated'
elif self.last_update_failed:
status = 'failed'
return 'updating'
elif not self.last_job:
return 'never updated'
elif self.last_job_failed:
return 'failed'
elif not self.get_project_path():
status = 'missing'
return 'missing'
else:
status = 'successful'
return 'successful'
elif not self.get_project_path():
status = 'missing'
return 'missing'
else:
status = 'ok'
# Determine current last_updated timestamp.
last_updated = None
if self.scm_type and self.last_update:
last_updated = self.last_update.modified
return 'ok'
def _get_last_job_run(self):
if self.scm_type and self.last_job:
return self.last_job.finished
else:
project_path = self.get_project_path()
if project_path:
try:
mtime = os.path.getmtime(project_path)
dt = datetime.datetime.fromtimestamp(mtime)
last_updated = make_aware(dt, get_default_timezone())
return make_aware(dt, get_default_timezone())
except os.error:
pass
# Update values if changed.
update_fields = []
if self.status != status:
self.status = status
update_fields.append('status')
if self.last_updated != last_updated:
self.last_updated = last_updated
update_fields.append('last_updated')
if save and update_fields:
self.save(update_fields=update_fields)
return update_fields
@property
def can_update(self):
def _can_update(self):
# FIXME: Prevent update when another one is active!
return bool(self.scm_type)# and not self.current_update)
@ -312,7 +297,7 @@ class ProjectBase(ProjectOptions):
if getattr(settings, 'UNIFIED_JOBS_STEP') == 0:
class Project(CommonModel, ProjectBase):
class Project(ProjectBaseMethods, CommonModel, ProjectBase):
PROJECT_STATUS_CHOICES = [
('ok', 'OK'),
@ -359,7 +344,7 @@ if getattr(settings, 'UNIFIED_JOBS_STEP') == 0:
if getattr(settings, 'UNIFIED_JOBS_STEP') in (0, 1):
class ProjectNew(UnifiedJobTemplate, ProjectBase):
class ProjectNew(ProjectBaseMethods, UnifiedJobTemplate, ProjectBase):
class Meta:
app_label = 'main'
@ -373,7 +358,7 @@ if getattr(settings, 'UNIFIED_JOBS_STEP') == 1:
if getattr(settings, 'UNIFIED_JOBS_STEP') == 2:
class Project(UnifiedJobTemplate, ProjectBase):
class Project(ProjectBaseMethods, UnifiedJobTemplate, ProjectBase):
class Meta:
app_label = 'main'
@ -388,12 +373,15 @@ class ProjectUpdateBase(ProjectOptions):
app_label = 'main'
abstract = True
def get_absolute_url(self):
return reverse('api:project_update_detail', args=(self.pk,))
class ProjectUpdateBaseMethods(object):
def _get_parent_instance(self):
return self.project
def get_absolute_url(self):
return reverse('api:project_update_detail', args=(self.pk,))
def _get_task_class(self):
from awx.main.tasks import RunProjectUpdate
return RunProjectUpdate
@ -402,25 +390,25 @@ class ProjectUpdateBase(ProjectOptions):
parent_instance = self._get_parent_instance()
if parent_instance:
if self.status in ('pending', 'waiting', 'running'):
if parent_instance.current_update != self:
parent_instance.current_update = self
parent_instance.save(update_fields=['current_update'])
if parent_instance.current_job != self:
parent_instance.current_job = self
parent_instance.save(update_fields=['current_job'])
elif self.status in ('successful', 'failed', 'error', 'canceled'):
if parent_instance.current_update == self:
parent_instance.current_update = None
parent_instance.last_update = self
parent_instance.last_update_failed = self.failed
if parent_instance.current_job == self:
parent_instance.current_job = None
parent_instance.last_job = self
parent_instance.last_job_failed = self.failed
if not self.failed and parent_instance.scm_delete_on_next_update:
parent_instance.scm_delete_on_next_update = False
parent_instance.save(update_fields=['current_update',
'last_update',
'last_update_failed',
parent_instance.save(update_fields=['current_job',
'last_job',
'last_job_failed',
'scm_delete_on_next_update'])
if getattr(settings, 'UNIFIED_JOBS_STEP') == 0:
class ProjectUpdate(CommonTask, ProjectUpdateBase):
class ProjectUpdate(ProjectUpdateBaseMethods, CommonTask, ProjectUpdateBase):
class Meta:
app_label = 'main'
@ -434,7 +422,7 @@ if getattr(settings, 'UNIFIED_JOBS_STEP') == 0:
if getattr(settings, 'UNIFIED_JOBS_STEP') in (0, 1):
class ProjectUpdateNew(UnifiedJob, ProjectUpdateBase):
class ProjectUpdateNew(ProjectUpdateBaseMethods, UnifiedJob, ProjectUpdateBase):
class Meta:
app_label = 'main'
@ -455,7 +443,7 @@ if getattr(settings, 'UNIFIED_JOBS_STEP') == 1:
if getattr(settings, 'UNIFIED_JOBS_STEP') == 2:
class ProjectUpdate(UnifiedJob, ProjectUpdateBase):
class ProjectUpdate(ProjectUpdateBaseMethods, UnifiedJob, ProjectUpdateBase):
class Meta:
app_label = 'main'

View File

@ -40,3 +40,6 @@ class Schedule(CommonModel):
rrule = models.CharField(
max_length=255,
)
def save(self, *args, **kwargs):
super(Schedule, self).save(*args, **kwargs)

View File

@ -108,8 +108,85 @@ class UnifiedJobTemplate(PolymorphicModel, CommonModelNameNotUnique):
default='ok',
editable=False,
)
# FIXME: Include code common to Project/InventorySource/JobTemplate
@property
def current_update(self):
return self.current_job
@property
def last_update(self):
return self.last_job
@property
def last_update_failed(self):
return self.last_job_failed
@property
def last_updated(self):
return self.last_job_run
def save(self, *args, **kwargs):
# If update_fields has been specified, add our field names to it,
# if it hasn't been specified, then we're just doing a normal save.
update_fields = kwargs.get('update_fields', [])
# Update status and last_updated fields.
updated_fields = self._set_status_and_last_job_run(save=False)
for field in updated_fields:
if field not in update_fields:
update_fields.append(field)
# Do the actual save.
super(UnifiedJobTemplate, self).save(*args, **kwargs)
def _get_current_status(self):
# Override in subclasses as needed.
if self.current_job:
return 'updating'
elif not self.last_job:
return 'never updated'
elif self.last_job_failed:
return 'failed'
else:
return 'successful'
def _get_last_job_run(self):
# Override in subclasses as needed.
if self.last_job:
return self.last_job.finished
def _set_status_and_last_job_run(self, save=True):
status = self._get_current_status()
last_job_run = self._get_last_job_run()
# Update values if changed.
update_fields = []
if self.status != status:
self.status = status
update_fields.append('status')
if self.last_job_run != last_job_run:
self.last_job_run = last_job_run
update_fields.append('last_job_run')
if save and update_fields:
self.save(update_fields=update_fields)
return update_fields
def _can_update(self):
# Override in subclasses as needed.
return False
@property
def can_update(self):
return self._can_update()
def update_signature(self, **kwargs):
raise NotImplementedError # Implement in subclass.
def update(self, **kwargs):
raise NotImplementedError # Implement in subclass.
def _get_child_queryset(self):
pass
def _create_child_instance(self, **kwargs):
pass
class UnifiedJob(PolymorphicModel, PrimordialModel):
@ -156,7 +233,6 @@ class UnifiedJob(PolymorphicModel, PrimordialModel):
editable=False,
related_name='%(class)s_blocked_by+',
)
cancel_flag = models.BooleanField(
blank=True,
default=False,
@ -229,4 +305,176 @@ class UnifiedJob(PolymorphicModel, PrimordialModel):
default='',
editable=False,
)
# FIXME: Add methods from CommonTask.
def __unicode__(self):
return u'%s-%s-%s' % (self.created, self.id, self.status)
def _get_parent_instance(self):
return self.job_template
def _update_parent_instance(self):
parent_instance = self._get_parent_instance()
if parent_instance:
if self.status in ('pending', 'waiting', 'running'):
if parent_instance.current_job != self:
parent_instance.current_job = self
parent_instance.save(update_fields=['current_job'])
elif self.status in ('successful', 'failed', 'error', 'canceled'):
if parent_instance.current_job == self:
parent_instance.current_job = None
parent_instance.last_job = self
parent_instance.last_job_failed = self.failed
parent_instance.save(update_fields=['current_job',
'last_job',
'last_job_failed'])
def save(self, *args, **kwargs):
# If update_fields has been specified, add our field names to it,
# if it hasn't been specified, then we're just doing a normal save.
update_fields = kwargs.get('update_fields', [])
# Get status before save...
status_before = self.status or 'new'
if self.pk:
self_before = self.__class__.objects.get(pk=self.pk)
if self_before.status != self.status:
status_before = self_before.status
failed = bool(self.status in ('failed', 'error', 'canceled'))
if self.failed != failed:
self.failed = failed
if 'failed' not in update_fields:
update_fields.append('failed')
if self.status == 'running' and not self.started:
self.started = now()
if 'started' not in update_fields:
update_fields.append('started')
if self.status in ('successful', 'failed', 'error', 'canceled') and not self.finished:
self.finished = now()
if 'finished' not in update_fields:
update_fields.append('finished')
if self.started and self.finished and not self.elapsed:
td = self.finished - self.started
elapsed = (td.microseconds + (td.seconds + td.days * 24 * 3600) * 10**6) / (10**6 * 1.0)
else:
elapsed = 0.0
if self.elapsed != elapsed:
self.elapsed = elapsed
if 'elapsed' not in update_fields:
update_fields.append('elapsed')
super(UnifiedJob, self).save(*args, **kwargs)
# If status changed, update parent instance....
if self.status != status_before:
self._update_parent_instance()
def delete(self):
if self.result_stdout_file != "":
try:
os.remove(self.result_stdout_file)
except Exception, e:
pass
super(UnifiedJob, self).delete()
@property
def result_stdout(self):
if self.result_stdout_file != "":
if not os.path.exists(self.result_stdout_file):
return "stdout capture is missing"
stdout_fd = open(self.result_stdout_file, "r")
output = stdout_fd.read()
stdout_fd.close()
return output
return self.result_stdout_text
@property
def celery_task(self):
try:
if self.celery_task_id:
return TaskMeta.objects.get(task_id=self.celery_task_id)
except TaskMeta.DoesNotExist:
pass
@property
def can_start(self):
return bool(self.status == 'new')
def _get_task_class(self):
raise NotImplementedError
def _get_passwords_needed_to_start(self):
return []
def start_signature(self, **kwargs):
from awx.main.tasks import handle_work_error
task_class = self._get_task_class()
if not self.can_start:
return False
needed = self._get_passwords_needed_to_start()
opts = dict([(field, kwargs.get(field, '')) for field in needed])
if not all(opts.values()):
return False
self.status = 'pending'
self.save(update_fields=['status'])
transaction.commit()
task_actual = task_class().si(self.pk, **opts)
return task_actual
def start(self, **kwargs):
task_actual = self.start_signature(**kwargs)
# TODO: Callback for status
task_result = task_actual.delay()
# Reload instance from database so we don't clobber results from task
# (mainly from tests when using Django 1.4.x).
instance = self.__class__.objects.get(pk=self.pk)
# The TaskMeta instance in the database isn't created until the worker
# starts processing the task, so we can only store the task ID here.
instance.celery_task_id = task_result.task_id
instance.save(update_fields=['celery_task_id'])
return True
@property
def can_cancel(self):
return bool(self.status in ('pending', 'waiting', 'running'))
def _force_cancel(self):
# Update the status to 'canceled' if we can detect that the job
# really isn't running (i.e. celery has crashed or forcefully
# killed the worker).
task_statuses = ('STARTED', 'SUCCESS', 'FAILED', 'RETRY', 'REVOKED')
try:
taskmeta = self.celery_task
if not taskmeta or taskmeta.status not in task_statuses:
return
from celery import current_app
i = current_app.control.inspect()
for v in (i.active() or {}).values():
if taskmeta.task_id in [x['id'] for x in v]:
return
for v in (i.reserved() or {}).values():
if taskmeta.task_id in [x['id'] for x in v]:
return
for v in (i.revoked() or {}).values():
if taskmeta.task_id in [x['id'] for x in v]:
return
for v in (i.scheduled() or {}).values():
if taskmeta.task_id in [x['id'] for x in v]:
return
instance = self.__class__.objects.get(pk=self.pk)
if instance.can_cancel:
instance.status = 'canceled'
update_fields = ['status']
if not instance.result_traceback:
instance.result_traceback = 'Forced cancel'
update_fields.append('result_traceback')
instance.save(update_fields=update_fields)
except: # FIXME: Log this exception!
if settings.DEBUG:
raise
def cancel(self):
if self.can_cancel:
if not self.cancel_flag:
self.cancel_flag = True
self.save(update_fields=['cancel_flag'])
if settings.BROKER_URL.startswith('amqp://'):
self._force_cancel()
return self.cancel_flag

View File

@ -372,7 +372,16 @@ class BaseTestMixin(object):
self.check_list_ids(response, qs, check_order)
if fields:
for obj in response['results']:
self.assertTrue(set(obj.keys()) <= set(fields))
returned_fields = set(obj.keys())
expected_fields = set(fields)
msg = ''
not_expected = returned_fields - expected_fields
if not_expected:
msg += 'fields %s not expected ' % ', '.join(not_expected)
not_returned = expected_fields - returned_fields
if not_returned:
msg += 'fields %s not returned ' % ', '.join(not_returned)
self.assertTrue(set(obj.keys()) <= set(fields), msg)
def start_queue(self, consumer_port, queue_port):
self.queue_process = Process(target=run_subscriber,

View File

@ -451,7 +451,7 @@ class BaseJobTestMixin(BaseTestMixin):
class JobTemplateTest(BaseJobTestMixin, django.test.TestCase):
JOB_TEMPLATE_FIELDS = ('id', 'url', 'related', 'summary_fields', 'created',
JOB_TEMPLATE_FIELDS = ('id', 'type', 'url', 'related', 'summary_fields', 'created',
'modified', 'name', 'description', 'job_type',
'inventory', 'project', 'playbook', 'credential',
'cloud_credential', 'forks', 'limit', 'verbosity',

View File

@ -1603,7 +1603,7 @@ class ProjectUpdatesTest(BaseTransactionTest):
self.assertEqual(job.status, 'new')
self.assertFalse(job.passwords_needed_to_start)
self.assertTrue(job.start())
self.assertEqual(job.status, 'pending')
self.assertTrue(job.status in ('pending', 'waiting'), job.status)
job = Job.objects.get(pk=job.pk)
self.assertTrue(job.status in ('successful', 'failed'),
job.result_stdout + job.result_traceback)
@ -1617,7 +1617,7 @@ class ProjectUpdatesTest(BaseTransactionTest):
self.assertEqual(job.status, 'new')
self.assertFalse(job.passwords_needed_to_start)
self.assertTrue(job.start())
self.assertEqual(job.status, 'pending')
self.assertTrue(job.status in ('pending', 'waiting'), job.status)
job = Job.objects.get(pk=job.pk)
# FIXME: Not quite sure why the project update still returns successful
# in this case?

View File

@ -269,8 +269,8 @@ TEST_GIT_USERNAME = ''
TEST_GIT_PASSWORD = ''
TEST_GIT_KEY_DATA = TEST_SSH_KEY_DATA
TEST_GIT_PUBLIC_HTTPS = 'https://github.com/ansible/ansible.github.com.git'
TEST_GIT_PRIVATE_HTTPS = 'https://github.com/ansible/ansible-doc.git'
TEST_GIT_PRIVATE_SSH = 'git@github.com:ansible/ansible-doc.git'
TEST_GIT_PRIVATE_HTTPS = 'https://github.com/ansible/product-docs.git'
TEST_GIT_PRIVATE_SSH = 'git@github.com:ansible/product-docs.git'
TEST_HG_USERNAME = ''
TEST_HG_PASSWORD = ''
@ -282,7 +282,7 @@ TEST_HG_PRIVATE_SSH = ''
TEST_SVN_USERNAME = ''
TEST_SVN_PASSWORD = ''
TEST_SVN_PUBLIC_HTTPS = 'https://github.com/ansible/ansible.github.com'
TEST_SVN_PRIVATE_HTTPS = 'https://github.com/ansible/ansible-doc'
TEST_SVN_PRIVATE_HTTPS = 'https://github.com/ansible/product-docs'
# To test repo access via SSH login to localhost.
import getpass