1
0
mirror of https://github.com/ansible/awx.git synced 2024-10-27 09:25:10 +03:00

Merge branch 'rampart_groups_setup_playbook' into devel

* rampart_groups_setup_playbook:
  Updating changelog for Instance Groups
  Fix an incorrect reference on instance group jobs list
  Purge remaining references to rampart groups
  Simplify can_access for instance groups on job templates
  Adding Instance Group permissions and tests
  Increase test coverage for task scheduler inventory updates
  Exit logic fixes for instance group tools
  View Fixes for instance groups
  new view to allow associations but no creations
  Updating acceptance documentation and system docs
  Updating unit tests for task manager refactoring
  Update views and serializers to support instance group (ramparts)
  Implementing models for instance groups, updating task manager
  Updating the setup playbook to support instance group installation
  Add nginx to server start and switch back to first tmux win
  Fix an issue where the local queue wouldn't use the rabbitmq name
This commit is contained in:
Matthew Jones 2017-05-12 13:40:30 -04:00
commit 1a4a6273a4
45 changed files with 1501 additions and 1645 deletions

View File

@ -341,6 +341,7 @@ init:
. $(VENV_BASE)/tower/bin/activate; \
fi; \
tower-manage register_instance --hostname=$(COMPOSE_HOST); \
tower-manage register_queue --queuename=tower --hostnames=$(COMPOSE_HOST);\
# Refresh development environment after pulling new code.
refresh: clean requirements_dev version_file develop migrate
@ -373,10 +374,12 @@ server_noattach:
tmux select-window -t tower:1
tmux rename-window 'WebSockets'
tmux split-window -h 'exec make runworker'
tmux split-window -v 'exec make nginx'
tmux new-window 'exec make receiver'
tmux select-window -t tower:2
tmux rename-window 'Extra Services'
tmux split-window -h 'exec make factcacher'
tmux select-window -t tower:0
server: server_noattach
tmux -2 attach-session -t tower
@ -441,7 +444,7 @@ celeryd:
@if [ "$(VENV_BASE)" ]; then \
. $(VENV_BASE)/tower/bin/activate; \
fi; \
$(PYTHON) manage.py celeryd -l DEBUG -B --autoreload --autoscale=20,3 --schedule=$(CELERY_SCHEDULE_FILE) -Q projects,jobs,default,scheduler,broadcast_all,$(COMPOSE_HOST) -n celery@$(COMPOSE_HOST)
$(PYTHON) manage.py celeryd -l DEBUG -B --autoreload --autoscale=20,3 --schedule=$(CELERY_SCHEDULE_FILE) -Q tower_scheduler,tower_broadcast_all,tower,$(COMPOSE_HOST) -n celery@$(COMPOSE_HOST)
#$(PYTHON) manage.py celery multi show projects jobs default -l DEBUG -Q:projects projects -Q:jobs jobs -Q:default default -c:projects 1 -c:jobs 3 -c:default 3 -Ofair -B --schedule=$(CELERY_SCHEDULE_FILE)
# Run to start the zeromq callback receiver

View File

@ -41,7 +41,8 @@ __all__ = ['APIView', 'GenericAPIView', 'ListAPIView', 'SimpleListAPIView',
'SubDetailAPIView',
'ResourceAccessList',
'ParentMixin',
'DeleteLastUnattachLabelMixin',]
'DeleteLastUnattachLabelMixin',
'SubListAttachDetachAPIView',]
logger = logging.getLogger('awx.api.generics')
analytics_logger = logging.getLogger('awx.analytics.performance')
@ -553,6 +554,21 @@ class SubListCreateAttachDetachAPIView(SubListCreateAPIView):
return self.attach(request, *args, **kwargs)
class SubListAttachDetachAPIView(SubListCreateAttachDetachAPIView):
'''
Derived version of SubListCreateAttachDetachAPIView that prohibits creation
'''
def post(self, request, *args, **kwargs):
sub_id = request.data.get('id', None)
if not sub_id:
return Response(
dict(msg=_("{} 'id' field is missing.".format(
self.model._meta.verbose_name.title()))),
status=status.HTTP_400_BAD_REQUEST)
return super(SubListAttachDetachAPIView, self).post(request, *args, **kwargs)
class DeleteLastUnattachLabelMixin(object):
'''
Models for which you want the last instance to be deleted from the database

View File

@ -104,6 +104,7 @@ SUMMARIZABLE_FK_FIELDS = {
'source_script': ('name', 'description'),
'role': ('id', 'role_field'),
'notification_template': DEFAULT_SUMMARY_FIELDS,
'instance_group': {'id', 'name'}
}
@ -894,6 +895,7 @@ class OrganizationSerializer(BaseSerializer):
notification_templates_error = self.reverse('api:organization_notification_templates_error_list', kwargs={'pk': obj.pk}),
object_roles = self.reverse('api:organization_object_roles_list', kwargs={'pk': obj.pk}),
access_list = self.reverse('api:organization_access_list', kwargs={'pk': obj.pk}),
instance_groups = self.reverse('api:organization_instance_groups_list', kwargs={'pk': obj.pk}),
))
return res
@ -1127,6 +1129,7 @@ class InventorySerializer(BaseSerializerWithVariables):
ad_hoc_commands = self.reverse('api:inventory_ad_hoc_commands_list', kwargs={'pk': obj.pk}),
access_list = self.reverse('api:inventory_access_list', kwargs={'pk': obj.pk}),
object_roles = self.reverse('api:inventory_object_roles_list', kwargs={'pk': obj.pk}),
instance_groups = self.reverse('api:inventory_instance_groups_list', kwargs={'pk': obj.pk}),
))
if obj.organization:
res['organization'] = self.reverse('api:organization_detail', kwargs={'pk': obj.organization.pk})
@ -2330,6 +2333,7 @@ class JobTemplateSerializer(JobTemplateMixin, UnifiedJobTemplateSerializer, JobO
survey_spec = self.reverse('api:job_template_survey_spec', kwargs={'pk': obj.pk}),
labels = self.reverse('api:job_template_label_list', kwargs={'pk': obj.pk}),
object_roles = self.reverse('api:job_template_object_roles_list', kwargs={'pk': obj.pk}),
instance_groups = self.reverse('api:job_template_instance_groups_list', kwargs={'pk': obj.pk}),
))
if obj.host_config_key:
res['callback'] = self.reverse('api:job_template_callback', kwargs={'pk': obj.pk})
@ -2383,7 +2387,8 @@ class JobSerializer(UnifiedJobSerializer, JobOptionsSerializer):
fields = ('*', 'job_template', 'passwords_needed_to_start', 'ask_variables_on_launch',
'ask_limit_on_launch', 'ask_tags_on_launch', 'ask_skip_tags_on_launch',
'ask_job_type_on_launch', 'ask_verbosity_on_launch', 'ask_inventory_on_launch',
'ask_credential_on_launch', 'allow_simultaneous', 'artifacts', 'scm_revision',)
'ask_credential_on_launch', 'allow_simultaneous', 'artifacts', 'scm_revision',
'instance_group')
def get_related(self, obj):
res = super(JobSerializer, self).get_related(obj)
@ -3370,6 +3375,37 @@ class ScheduleSerializer(BaseSerializer):
return value
class InstanceSerializer(BaseSerializer):
consumed_capacity = serializers.SerializerMethodField()
class Meta:
model = Instance
fields = ("related", "id", "uuid", "hostname", "created", "modified", "version", "capacity", "consumed_capacity")
def get_related(self, obj):
res = super(InstanceSerializer, self).get_related(obj)
res['jobs'] = self.reverse('api:instance_unified_jobs_list', kwargs={'pk': obj.pk})
res['instance_groups'] = self.reverse('api:instance_instance_groups_list', kwargs={'pk': obj.pk})
return res
def get_consumed_capacity(self, obj):
return obj.consumed_capacity
class InstanceGroupSerializer(BaseSerializer):
class Meta:
model = InstanceGroup
fields = ("related", "id", "name", "created", "modified", "capacity", "consumed_capacity")
def get_related(self, obj):
res = super(InstanceGroupSerializer, self).get_related(obj)
res['jobs'] = self.reverse('api:instance_group_unified_jobs_list', kwargs={'pk': obj.pk})
res['instances'] = self.reverse('api:instance_group_instance_list', kwargs={'pk': obj.pk})
return res
class ActivityStreamSerializer(BaseSerializer):
changes = serializers.SerializerMethodField()

View File

@ -26,6 +26,7 @@ organization_urls = patterns('awx.api.views',
url(r'^(?P<pk>[0-9]+)/notification_templates_any/$', 'organization_notification_templates_any_list'),
url(r'^(?P<pk>[0-9]+)/notification_templates_error/$', 'organization_notification_templates_error_list'),
url(r'^(?P<pk>[0-9]+)/notification_templates_success/$', 'organization_notification_templates_success_list'),
url(r'^(?P<pk>[0-9]+)/instance_groups/$', 'organization_instance_groups_list'),
url(r'^(?P<pk>[0-9]+)/object_roles/$', 'organization_object_roles_list'),
url(r'^(?P<pk>[0-9]+)/access_list/$', 'organization_access_list'),
)
@ -99,6 +100,7 @@ inventory_urls = patterns('awx.api.views',
url(r'^(?P<pk>[0-9]+)/ad_hoc_commands/$', 'inventory_ad_hoc_commands_list'),
url(r'^(?P<pk>[0-9]+)/access_list/$', 'inventory_access_list'),
url(r'^(?P<pk>[0-9]+)/object_roles/$', 'inventory_object_roles_list'),
url(r'^(?P<pk>[0-9]+)/instance_groups/$', 'inventory_instance_groups_list'),
#url(r'^(?P<pk>[0-9]+)/single_fact/$', 'inventory_single_fact_view'),
)
@ -202,6 +204,7 @@ job_template_urls = patterns('awx.api.views',
url(r'^(?P<pk>[0-9]+)/notification_templates_any/$', 'job_template_notification_templates_any_list'),
url(r'^(?P<pk>[0-9]+)/notification_templates_error/$', 'job_template_notification_templates_error_list'),
url(r'^(?P<pk>[0-9]+)/notification_templates_success/$', 'job_template_notification_templates_success_list'),
url(r'^(?P<pk>[0-9]+)/instance_groups/$', 'job_template_instance_groups_list'),
url(r'^(?P<pk>[0-9]+)/access_list/$', 'job_template_access_list'),
url(r'^(?P<pk>[0-9]+)/object_roles/$', 'job_template_object_roles_list'),
url(r'^(?P<pk>[0-9]+)/labels/$', 'job_template_label_list'),
@ -340,6 +343,20 @@ activity_stream_urls = patterns('awx.api.views',
url(r'^(?P<pk>[0-9]+)/$', 'activity_stream_detail'),
)
instance_urls = patterns('awx.api.views',
url(r'^$', 'instance_list'),
url(r'^(?P<pk>[0-9]+)/$', 'instance_detail'),
url(r'^(?P<pk>[0-9]+)/jobs/$', 'instance_unified_jobs_list'),
url(r'^(?P<pk>[0-9]+)/instance_groups/$', 'instance_instance_groups_list'),
)
instance_group_urls = patterns('awx.api.views',
url(r'^$', 'instance_group_list'),
url(r'^(?P<pk>[0-9]+)/$', 'instance_group_detail'),
url(r'^(?P<pk>[0-9]+)/jobs/$', 'instance_group_unified_jobs_list'),
url(r'^(?P<pk>[0-9]+)/instances/$', 'instance_group_instance_list'),
)
v1_urls = patterns('awx.api.views',
url(r'^$', 'api_v1_root_view'),
url(r'^ping/$', 'api_v1_ping_view'),
@ -350,6 +367,8 @@ v1_urls = patterns('awx.api.views',
url(r'^dashboard/$', 'dashboard_view'),
url(r'^dashboard/graphs/jobs/$','dashboard_jobs_graph_view'),
url(r'^settings/', include('awx.conf.urls')),
url(r'^instances/', include(instance_urls)),
url(r'^instance_groups/', include(instance_group_urls)),
url(r'^schedules/', include(schedule_urls)),
url(r'^organizations/', include(organization_urls)),
url(r'^users/', include(user_urls)),

View File

@ -159,6 +159,8 @@ class ApiVersionRootView(APIView):
data = OrderedDict()
data['authtoken'] = reverse('api:auth_token_view', request=request)
data['ping'] = reverse('api:api_v1_ping_view', request=request)
data['instances'] = reverse('api:instance_list', request=request)
data['instance_groups'] = reverse('api:instance_group_list', request=request)
data['config'] = reverse('api:api_v1_config_view', request=request)
data['settings'] = reverse('api:setting_category_list', request=request)
data['me'] = reverse('api:user_me_list', request=request)
@ -238,6 +240,11 @@ class ApiV1PingView(APIView):
response['instances'].append(dict(node=instance.hostname, heartbeat=instance.modified,
capacity=instance.capacity, version=instance.version))
response['instances'].sort()
response['instance_groups'] = []
for instance_group in InstanceGroup.objects.all():
response['instance_groups'].append(dict(name=instance_group.name,
capacity=instance_group.capacity,
instances=[x.hostname for x in instance_group.instances.all()]))
return Response(response)
@ -491,6 +498,83 @@ class DashboardJobsGraphView(APIView):
return Response(dashboard_data)
class InstanceList(ListAPIView):
view_name = _("Instances")
model = Instance
serializer_class = InstanceSerializer
new_in_320 = True
class InstanceDetail(RetrieveAPIView):
view_name = _("Instance Detail")
model = Instance
serializer_class = InstanceSerializer
new_in_320 = True
class InstanceUnifiedJobsList(SubListAPIView):
view_name = _("Instance Running Jobs")
model = UnifiedJob
serializer_class = UnifiedJobSerializer
parent_model = Instance
new_in_320 = True
def get_queryset(self):
po = self.get_parent_object()
qs = get_user_queryset(self.request.user, UnifiedJob)
qs = qs.filter(execution_node=po.hostname)
return qs
class InstanceInstanceGroupsList(SubListAPIView):
view_name = _("Instance's Instance Groups")
model = InstanceGroup
serializer_class = InstanceGroupSerializer
parent_model = Instance
new_in_320 = True
relationship = 'rampart_groups'
class InstanceGroupList(ListAPIView):
view_name = _("Instance Groups")
model = InstanceGroup
serializer_class = InstanceGroupSerializer
new_in_320 = True
class InstanceGroupDetail(RetrieveAPIView):
view_name = _("Instance Group Detail")
model = InstanceGroup
serializer_class = InstanceGroupSerializer
new_in_320 = True
class InstanceGroupUnifiedJobsList(SubListAPIView):
view_name = _("Instance Group Running Jobs")
model = UnifiedJob
serializer_class = UnifiedJobSerializer
parent_model = InstanceGroup
relationship = "unifiedjob_set"
new_in_320 = True
class InstanceGroupInstanceList(SubListAPIView):
view_name = _("Instance Group's Instances")
model = Instance
serializer_class = InstanceSerializer
parent_model = InstanceGroup
new_in_320 = True
relationship = "instances"
class ScheduleList(ListAPIView):
view_name = _("Schedules")
@ -897,6 +981,15 @@ class OrganizationNotificationTemplatesSuccessList(SubListCreateAttachDetachAPIV
new_in_300 = True
class OrganizationInstanceGroupsList(SubListAttachDetachAPIView):
model = InstanceGroup
serializer_class = InstanceGroupSerializer
parent_model = Organization
relationship = 'instance_groups'
new_in_320 = True
class OrganizationAccessList(ResourceAccessList):
model = User # needs to be User for AccessLists's
@ -942,7 +1035,7 @@ class TeamUsersList(BaseUsersList):
relationship = 'member_role.members'
class TeamRolesList(SubListCreateAttachDetachAPIView):
class TeamRolesList(SubListAttachDetachAPIView):
model = Role
serializer_class = RoleSerializerWithParentAccess
@ -958,11 +1051,9 @@ class TeamRolesList(SubListCreateAttachDetachAPIView):
return Role.filter_visible_roles(self.request.user, team.member_role.children.all().exclude(pk=team.read_role.pk))
def post(self, request, *args, **kwargs):
# Forbid implicit role creation here
sub_id = request.data.get('id', None)
if not sub_id:
data = dict(msg=_("Role 'id' field is missing."))
return Response(data, status=status.HTTP_400_BAD_REQUEST)
return super(TeamRolesList, self).post(request)
role = get_object_or_400(Role, pk=sub_id)
org_content_type = ContentType.objects.get_for_model(Organization)
@ -1325,7 +1416,7 @@ class UserTeamsList(ListAPIView):
Q(member_role__members=u) | Q(admin_role__members=u)).distinct()
class UserRolesList(SubListCreateAttachDetachAPIView):
class UserRolesList(SubListAttachDetachAPIView):
model = Role
serializer_class = RoleSerializerWithParentAccess
@ -1345,11 +1436,9 @@ class UserRolesList(SubListCreateAttachDetachAPIView):
.exclude(content_type=content_type, object_id=u.id)
def post(self, request, *args, **kwargs):
# Forbid implicit role creation here
sub_id = request.data.get('id', None)
if not sub_id:
data = dict(msg=_("Role 'id' field is missing."))
return Response(data, status=status.HTTP_400_BAD_REQUEST)
return super(UserRolesList, self).post(request)
if sub_id == self.request.user.admin_role.pk:
raise PermissionDenied(_('You may not perform any action with your own admin_role.'))
@ -1765,6 +1854,15 @@ class InventoryActivityStreamList(ActivityStreamEnforcementMixin, SubListAPIView
return qs.filter(Q(inventory=parent) | Q(host__in=parent.hosts.all()) | Q(group__in=parent.groups.all()))
class InventoryInstanceGroupsList(SubListAttachDetachAPIView):
model = InstanceGroup
serializer_class = InstanceGroupSerializer
parent_model = Inventory
relationship = 'instance_groups'
new_in_320 = True
class InventoryAccessList(ResourceAccessList):
model = User # needs to be User for AccessLists's
@ -2947,6 +3045,15 @@ class JobTemplateJobsList(SubListCreateAPIView):
parent_key = 'job_template'
class JobTemplateInstanceGroupsList(SubListAttachDetachAPIView):
model = InstanceGroup
serializer_class = InstanceGroupSerializer
parent_model = JobTemplate
relationship = 'instance_groups'
new_in_320 = True
class JobTemplateAccessList(ResourceAccessList):
model = User # needs to be User for AccessLists's
@ -4281,7 +4388,7 @@ class RoleDetail(RetrieveAPIView):
new_in_300 = True
class RoleUsersList(SubListCreateAttachDetachAPIView):
class RoleUsersList(SubListAttachDetachAPIView):
model = User
serializer_class = UserSerializer
@ -4298,8 +4405,7 @@ class RoleUsersList(SubListCreateAttachDetachAPIView):
# Forbid implicit user creation here
sub_id = request.data.get('id', None)
if not sub_id:
data = dict(msg=_("User 'id' field is missing."))
return Response(data, status=status.HTTP_400_BAD_REQUEST)
return super(RoleUsersList, self).post(request)
user = get_object_or_400(User, pk=sub_id)
role = self.get_parent_object()
@ -4323,7 +4429,7 @@ class RoleUsersList(SubListCreateAttachDetachAPIView):
return super(RoleUsersList, self).post(request, *args, **kwargs)
class RoleTeamsList(SubListAPIView):
class RoleTeamsList(SubListAttachDetachAPIView):
model = Team
serializer_class = TeamSerializer
@ -4338,11 +4444,9 @@ class RoleTeamsList(SubListAPIView):
return Team.objects.filter(member_role__children=role)
def post(self, request, pk, *args, **kwargs):
# Forbid implicit team creation here
sub_id = request.data.get('id', None)
if not sub_id:
data = dict(msg=_("Team 'id' field is missing."))
return Response(data, status=status.HTTP_400_BAD_REQUEST)
return super(RoleTeamsList, self).post(request)
team = get_object_or_400(Team, pk=sub_id)
role = Role.objects.get(pk=self.kwargs['pk'])

View File

@ -387,6 +387,43 @@ class BaseAccess(object):
return False
class InstanceAccess(BaseAccess):
model = Instance
def get_queryset(self):
return Instance.objects.filter(rampart_groups__in=self.user.get_queryset(InstanceGroup))
def can_add(self, data):
return False
def can_change(self, obj, data):
return False
def can_delete(self, obj):
return False
class InstanceGroupAccess(BaseAccess):
model = InstanceGroup
def get_queryset(self):
if self.user.is_superuser or self.user.is_system_auditor:
return InstanceGroup.objects.all()
else:
return InstanceGroup.objects.filter(organization__in=Organization.accessible_objects(self.user, 'admin_role'))
def can_add(self, data):
return False
def can_change(self, obj, data):
return False
def can_delete(self, obj):
return False
class UserAccess(BaseAccess):
'''
I can see user records when:
@ -511,6 +548,18 @@ class OrganizationAccess(BaseAccess):
"active_jobs": active_jobs})
return True
def can_attach(self, obj, sub_obj, relationship, *args, **kwargs):
if relationship == "instance_groups":
if self.user.can_access(type(sub_obj), "read", sub_obj) and self.user in obj.admin_role:
return True
return False
return super(OrganizationAccess, self).can_attach(obj, sub_obj, relationship, *args, **kwargs)
def can_unattach(self, obj, sub_obj, relationship, *args, **kwargs):
if relationship == "instance_groups":
return self.can_attach(obj, sub_obj, relationship, *args, **kwargs)
return super(OrganizationAccess, self).can_attach(obj, sub_obj, relationship, *args, **kwargs)
class InventoryAccess(BaseAccess):
'''
@ -581,6 +630,18 @@ class InventoryAccess(BaseAccess):
def can_run_ad_hoc_commands(self, obj):
return self.user in obj.adhoc_role
def can_attach(self, obj, sub_obj, relationship, *args, **kwargs):
if relationship == "instance_groups":
if self.user.can_access(type(sub_obj), "read", sub_obj) and self.user in obj.organization.admin_role:
return True
return False
return super(InventoryAccess, self).can_attach(obj, sub_obj, relationship, *args, **kwargs)
def can_unattach(self, obj, sub_obj, relationship, *args, **kwargs):
if relationship == "instance_groups":
return self.can_attach(obj, sub_obj, relationship, *args, **kwargs)
return super(InventoryAccess, self).can_attach(obj, sub_obj, relationship, *args, **kwargs)
class HostAccess(BaseAccess):
'''
@ -1238,9 +1299,17 @@ class JobTemplateAccess(BaseAccess):
def can_attach(self, obj, sub_obj, relationship, data, skip_sub_obj_read_check=False):
if isinstance(sub_obj, NotificationTemplate):
return self.check_related('organization', Organization, {}, obj=sub_obj, mandatory=True)
if relationship == "instance_groups":
return self.user.can_access(type(sub_obj), "read", sub_obj) and self.user in obj.project.organization.admin_role
return super(JobTemplateAccess, self).can_attach(
obj, sub_obj, relationship, data, skip_sub_obj_read_check=skip_sub_obj_read_check)
def can_unattach(self, obj, sub_obj, relationship, *args, **kwargs):
if relationship == "instance_groups":
return self.can_attach(obj, sub_obj, relationship, *args, **kwargs)
return super(InventoryAccess, self).can_attach(obj, sub_obj, relationship, *args, **kwargs)
class JobAccess(BaseAccess):
'''
@ -2303,3 +2372,5 @@ register_access(WorkflowJobTemplateNode, WorkflowJobTemplateNodeAccess)
register_access(WorkflowJobNode, WorkflowJobNodeAccess)
register_access(WorkflowJobTemplate, WorkflowJobTemplateAccess)
register_access(WorkflowJob, WorkflowJobAccess)
register_access(Instance, InstanceAccess)
register_access(InstanceGroup, InstanceGroupAccess)

View File

@ -0,0 +1,33 @@
# Copyright (c) 2017 Ansible Tower by Red Hat
# All Rights Reserved.
import sys
from awx.main.models import Instance, InstanceGroup
from optparse import make_option
from django.core.management.base import BaseCommand
class Command(BaseCommand):
option_list = BaseCommand.option_list + (
make_option('--queuename', dest='queuename', type='string',
help='Queue to be removed from'),
make_option('--hostname', dest='hostnames', type='string',
help='Host to remove from queue'),
)
def handle(self, **options):
ig = InstanceGroup.objects.filter(name=options.get('queuename'))
if not ig.exists():
print("Queue doesn't exist")
sys.exit(1)
ig = ig.first()
i = Instance.objects.filter(name=options.get("hostname"))
if not i.exists():
print("Host doesn't exist")
sys.exit(1)
i = i.first()
ig.instances.remove(i)
print("Instance removed from instance group")

View File

@ -1,7 +1,7 @@
# Copyright (c) 2015 Ansible, Inc.
# All Rights Reserved
from awx.main.models import Instance
from awx.main.models import Instance, InstanceGroup
from django.core.management.base import NoArgsCommand
@ -13,4 +13,10 @@ class Command(NoArgsCommand):
super(Command, self).__init__()
for instance in Instance.objects.all():
print("hostname: {}; created: {}; heartbeat: {}".format(instance.hostname, instance.created, instance.modified))
print("hostname: {}; created: {}; heartbeat: {}; capacity: {}".format(instance.hostname, instance.created,
instance.modified, instance.capacity))
for instance_group in InstanceGroup.objects.all():
print("Instance Group: {}; created: {}; capacity: {}; members: {}".format(instance_group.name,
instance_group.created,
instance_group.capacity,
[x.hostname for x in instance_group.instances.all()]))

View File

@ -0,0 +1,39 @@
# Copyright (c) 2017 Ansible Tower by Red Hat
# All Rights Reserved.
import sys
from awx.main.models import Instance, InstanceGroup
from optparse import make_option
from django.core.management.base import BaseCommand
class Command(BaseCommand):
option_list = BaseCommand.option_list + (
make_option('--queuename', dest='queuename', type='string',
help='Queue to create/update'),
make_option('--hostnames', dest='hostnames', type='string',
help='Comma-Delimited Hosts to add to the Queue'),
)
def handle(self, **options):
ig = InstanceGroup.objects.filter(name=options.get('queuename'))
if ig.exists():
print("Instance Group already registered {}".format(ig[0]))
ig = ig[0]
else:
print("Creating instance group {}".format(options.get('queuename')))
ig = InstanceGroup(name=options.get('queuename'))
ig.save()
instance_list = [x.strip() for x in options.get('hostnames').split(",")]
for inst_name in instance_list:
instance = Instance.objects.filter(hostname=inst_name)
if instance.exists() and instance not in ig.instances.all():
ig.instances.add(instance[0])
print("Added instance {} to {}".format(instance[0], ig))
elif not instance.exists():
print("Instance does not exist: {}".format(inst_name))
sys.exit(1)
else:
print("Instance already registered {}".format(instance[0]))

View File

@ -0,0 +1,25 @@
# Copyright (c) 2017 Ansible Tower by Red Hat
# All Rights Reserved.
import sys
from awx.main.models import InstanceGroup
from optparse import make_option
from django.core.management.base import BaseCommand
class Command(BaseCommand):
option_list = BaseCommand.option_list + (
make_option('--queuename', dest='queuename', type='string',
help='Queue to create/update'),
)
def handle(self, **options):
ig = InstanceGroup.objects.filter(name=options.get('queuename'))
if not ig.exists():
print("Instance group doesn't exist")
sys.exit(1)
ig = ig.first()
ig.delete()
print("Instance Group Removed")

View File

@ -0,0 +1,50 @@
# -*- coding: utf-8 -*-
from __future__ import unicode_literals
from django.db import migrations, models
import django.db.models.deletion
class Migration(migrations.Migration):
dependencies = [
('main', '0042_v320_drop_v1_credential_fields'),
]
operations = [
migrations.CreateModel(
name='InstanceGroup',
fields=[
('id', models.AutoField(verbose_name='ID', serialize=False, auto_created=True, primary_key=True)),
('name', models.CharField(unique=True, max_length=250)),
('created', models.DateTimeField(auto_now_add=True)),
('modified', models.DateTimeField(auto_now=True)),
('instances', models.ManyToManyField(help_text='Instances that are members of this InstanceGroup', related_name='rampart_groups', editable=False, to='main.Instance')),
],
),
migrations.AddField(
model_name='inventory',
name='instance_groups',
field=models.ManyToManyField(to='main.InstanceGroup', blank=True),
),
migrations.AddField(
model_name='unifiedjob',
name='instance_group',
field=models.ForeignKey(on_delete=django.db.models.deletion.SET_NULL, default=None, blank=True, to='main.InstanceGroup', help_text='The Rampart/Instance group the job was run under', null=True),
),
migrations.AddField(
model_name='unifiedjobtemplate',
name='instance_groups',
field=models.ManyToManyField(to='main.InstanceGroup', blank=True),
),
migrations.AddField(
model_name='organization',
name='instance_groups',
field=models.ManyToManyField(to='main.InstanceGroup', blank=True),
),
migrations.AddField(
model_name='activitystream',
name='instance_group',
field=models.ManyToManyField(to='main.InstanceGroup', blank=True),
),
]

View File

@ -64,6 +64,7 @@ class ActivityStream(models.Model):
notification = models.ManyToManyField("Notification", blank=True)
label = models.ManyToManyField("Label", blank=True)
role = models.ManyToManyField("Role", blank=True)
instance_group = models.ManyToManyField("InstanceGroup", blank=True)
def get_absolute_url(self, request=None):
return reverse('api:activity_stream_detail', kwargs={'pk': self.pk}, request=request)

View File

@ -4,6 +4,7 @@
from django.db import models
from django.db.models.signals import post_save
from django.dispatch import receiver
from django.utils.translation import ugettext_lazy as _
from solo.models import SingletonModel
@ -13,13 +14,11 @@ from awx.main.models.jobs import Job
from awx.main.models.projects import ProjectUpdate
from awx.main.models.unified_jobs import UnifiedJob
__all__ = ('Instance', 'JobOrigin', 'TowerScheduleState',)
__all__ = ('Instance', 'InstanceGroup', 'JobOrigin', 'TowerScheduleState',)
class Instance(models.Model):
"""A model representing an Ansible Tower instance, primary or secondary,
running against this database.
"""
"""A model representing an Ansible Tower instance running against this database."""
objects = InstanceManager()
uuid = models.CharField(max_length=40)
@ -35,12 +34,42 @@ class Instance(models.Model):
class Meta:
app_label = 'main'
@property
def consumed_capacity(self):
return sum(x.task_impact for x in UnifiedJob.objects.filter(execution_node=self.hostname,
status__in=('running', 'waiting')))
@property
def role(self):
# NOTE: TODO: Likely to repurpose this once standalone ramparts are a thing
return "tower"
class InstanceGroup(models.Model):
"""A model representing a Queue/Group of Tower Instances."""
name = models.CharField(max_length=250, unique=True)
created = models.DateTimeField(auto_now_add=True)
modified = models.DateTimeField(auto_now=True)
instances = models.ManyToManyField(
'Instance',
related_name='rampart_groups',
editable=False,
help_text=_('Instances that are members of this InstanceGroup'),
)
@property
def capacity(self):
return sum([x[0] for x in self.instances.values_list('capacity')])
@property
def consumed_capacity(self):
return sum(x.task_impact for x in UnifiedJob.objects.filter(instance_group=self,
status__in=('running', 'waiting')))
class Meta:
app_label = 'main'
class TowerScheduleState(SingletonModel):
schedule_last_run = models.DateTimeField(auto_now_add=True)

View File

@ -121,7 +121,10 @@ class Inventory(CommonModelNameNotUnique, ResourceMixin):
default=None,
help_text=_('Filter that will be applied to the hosts of this inventory.'),
)
instance_groups = models.ManyToManyField(
'InstanceGroup',
blank=True,
)
admin_role = ImplicitRoleField(
parent_role='organization.admin_role',
)
@ -1367,6 +1370,17 @@ class InventoryUpdate(UnifiedJob, InventorySourceOptions, JobNotificationMixin):
def get_notification_friendly_name(self):
return "Inventory Update"
@property
def preferred_instance_groups(self):
if self.inventory_source.inventory is not None and self.inventory_source.inventory.organization is not None:
organization_groups = [x for x in self.inventory_source.inventory.organization.instance_groups.all()]
else:
organization_groups = []
if self.inventory_source.inventory is not None:
inventory_groups = [x for x in self.inventory_source.inventory.instance_groups.all()]
template_groups = [x for x in super(InventoryUpdate, self).preferred_instance_groups]
return template_groups + inventory_groups + organization_groups
def _build_job_explanation(self):
if not self.job_explanation:
return 'Previous Task Canceled: {"job_type": "%s", "job_name": "%s", "job_id": "%s"}' % \

View File

@ -488,7 +488,6 @@ class Job(UnifiedJob, JobOptions, SurveyJobMixin, JobNotificationMixin):
)
@classmethod
def _get_parent_field_name(cls):
return 'job_template'
@ -654,6 +653,25 @@ class Job(UnifiedJob, JobOptions, SurveyJobMixin, JobNotificationMixin):
return "$hidden due to Ansible no_log flag$"
return artifacts
@property
def preferred_instance_groups(self):
if self.project is not None and self.project.organization is not None:
organization_groups = [x for x in self.project.organization.instance_groups.all()]
else:
organization_groups = []
if self.inventory is not None:
inventory_groups = [x for x in self.inventory.instance_groups.all()]
else:
inventory_groups = []
if self.job_template is not None:
template_groups = [x for x in self.job_template.instance_groups.all()]
else:
template_groups = []
selected_groups = template_groups + inventory_groups + organization_groups
if not selected_groups:
return super(Job, self).preferred_instance_groups
return selected_groups
# Job Credential required
@property
def can_start(self):

View File

@ -36,6 +36,10 @@ class Organization(CommonModel, NotificationFieldsModel, ResourceMixin):
app_label = 'main'
ordering = ('name',)
instance_groups = models.ManyToManyField(
'InstanceGroup',
blank=True,
)
admin_role = ImplicitRoleField(
parent_role='singleton:' + ROLE_SINGLETON_SYSTEM_ADMINISTRATOR,
)

View File

@ -516,3 +516,12 @@ class ProjectUpdate(UnifiedJob, ProjectOptions, JobNotificationMixin):
def get_notification_friendly_name(self):
return "Project Update"
@property
def preferred_instance_groups(self):
if self.project is not None and self.project.organization is not None:
organization_groups = [x for x in self.project.organization.instance_groups.all()]
else:
organization_groups = []
template_groups = [x for x in super(ProjectUpdate, self).preferred_instance_groups]
return template_groups + organization_groups

View File

@ -152,6 +152,10 @@ class UnifiedJobTemplate(PolymorphicModel, CommonModelNameNotUnique, Notificatio
blank=True,
related_name='%(class)s_labels'
)
instance_groups = models.ManyToManyField(
'InstanceGroup',
blank=True,
)
def get_absolute_url(self, request=None):
real_instance = self.get_real_instance()
@ -563,6 +567,14 @@ class UnifiedJob(PolymorphicModel, PasswordFieldsModel, CommonModelNameNotUnique
blank=True,
related_name='%(class)s_labels'
)
instance_group = models.ForeignKey(
'InstanceGroup',
blank=True,
null=True,
default=None,
on_delete=models.SET_NULL,
help_text=_('The Rampart/Instance group the job was run under'),
)
def get_absolute_url(self, request=None):
real_instance = self.get_real_instance()
@ -938,9 +950,9 @@ class UnifiedJob(PolymorphicModel, PasswordFieldsModel, CommonModelNameNotUnique
return (True, opts)
def start_celery_task(self, opts, error_callback, success_callback):
def start_celery_task(self, opts, error_callback, success_callback, queue):
task_class = self._get_task_class()
task_class().apply_async((self.pk,), opts, link_error=error_callback, link=success_callback)
task_class().apply_async((self.pk,), opts, link_error=error_callback, link=success_callback, queue=queue)
def start(self, error_callback, success_callback, **kwargs):
'''
@ -1048,3 +1060,14 @@ class UnifiedJob(PolymorphicModel, PasswordFieldsModel, CommonModelNameNotUnique
if settings.BROKER_URL.startswith('amqp://'):
self._force_cancel()
return self.cancel_flag
@property
def preferred_instance_groups(self):
'''
Return Instance/Rampart Groups preferred by this unified job templates
'''
from awx.main.models.ha import InstanceGroup
default_instance_group = InstanceGroup.objects.filter(name='tower')
template_groups = [x for x in self.unified_job_template.instance_groups.all()]
if not template_groups and default_instance_group.exists():
return [default_instance_group.first()]

View File

@ -1,4 +1,4 @@
#Copyright (c) 2015 Ansible, Inc.
# Copyright (c) 2015 Ansible, Inc.
# All Rights Reserved
# Python
@ -11,6 +11,7 @@ from django.conf import settings
from django.db import transaction, connection
from django.db.utils import DatabaseError
from django.utils.translation import ugettext_lazy as _
from django.utils.timezone import now as tz_now
# AWX
from awx.main.models import * # noqa
@ -18,17 +19,6 @@ from awx.main.models import * # noqa
from awx.main.scheduler.dag_workflow import WorkflowDAG
from awx.main.scheduler.dependency_graph import DependencyGraph
from awx.main.scheduler.partial import (
JobDict,
ProjectUpdateDict,
ProjectUpdateLatestDict,
InventoryUpdateDict,
InventoryUpdateLatestDict,
InventorySourceDict,
SystemJobDict,
AdHocCommandDict,
WorkflowJobDict,
)
from awx.main.tasks import _send_notification_templates
# Celery
@ -38,77 +28,104 @@ logger = logging.getLogger('awx.main.scheduler')
class TaskManager():
def __init__(self):
self.graph = DependencyGraph()
self.capacity_total = Instance.objects.total_capacity()
self.capacity_used = 0
self.graph = dict()
for rampart_group in InstanceGroup.objects.all():
self.graph[rampart_group.name] = dict(graph=DependencyGraph(rampart_group.name),
capacity_total=rampart_group.capacity,
capacity_used=0)
def get_tasks(self):
status_list = ('pending', 'waiting', 'running')
def is_job_blocked(self, task):
# TODO: I'm not happy with this, I think blocking behavior should be decided outside of the dependency graph
# in the old task manager this was handled as a method on each task object outside of the graph and
# probably has the side effect of cutting down *a lot* of the logic from this task manager class
for g in self.graph:
if self.graph[g]['graph'].is_job_blocked(task):
return True
return False
jobs = JobDict.filter_partial(status=status_list)
inventory_updates = InventoryUpdateDict.filter_partial(status=status_list)
project_updates = ProjectUpdateDict.filter_partial(status=status_list)
system_jobs = SystemJobDict.filter_partial(status=status_list)
ad_hoc_commands = AdHocCommandDict.filter_partial(status=status_list)
workflow_jobs = WorkflowJobDict.filter_partial(status=status_list)
def get_tasks(self, status_list=('pending', 'waiting', 'running')):
jobs = [j for j in Job.objects.filter(status__in=status_list)]
inventory_updates = [i for i in InventoryUpdate.objects.filter(status__in=status_list)]
project_updates = [p for p in ProjectUpdate.objects.filter(status__in=status_list)]
system_jobs = [s for s in SystemJob.objects.filter(status__in=status_list)]
ad_hoc_commands = [a for a in AdHocCommand.objects.filter(status__in=status_list)]
workflow_jobs = [w for w in WorkflowJob.objects.filter(status__in=status_list)]
all_tasks = sorted(jobs + project_updates + inventory_updates + system_jobs + ad_hoc_commands + workflow_jobs,
key=lambda task: task.created)
return all_tasks
all_actions = sorted(jobs + project_updates + inventory_updates + system_jobs + ad_hoc_commands + workflow_jobs,
key=lambda task: task['created'])
return all_actions
@classmethod
def get_node_type(cls, obj):
if type(obj) == Job:
return "job"
elif type(obj) == AdHocCommand:
return "ad_hoc_command"
elif type(obj) == InventoryUpdate:
return "inventory_update"
elif type(obj) == ProjectUpdate:
return "project_update"
elif type(obj) == SystemJob:
return "system_job"
elif type(obj) == WorkflowJob:
return "workflow_job"
return "unknown"
'''
Tasks that are running and SHOULD have a celery task.
'''
def get_running_tasks(self):
status_list = ('running',)
def get_running_tasks(self, all_tasks=None):
if all_tasks is None:
return self.get_tasks(status_list=('running',))
return filter(lambda t: t.status == 'running', all_tasks)
jobs = JobDict.filter_partial(status=status_list)
inventory_updates = InventoryUpdateDict.filter_partial(status=status_list)
project_updates = ProjectUpdateDict.filter_partial(status=status_list)
system_jobs = SystemJobDict.filter_partial(status=status_list)
ad_hoc_commands = AdHocCommandDict.filter_partial(status=status_list)
'''
Tasks that are currently running in celery
'''
def get_active_tasks(self):
inspector = inspect()
if not hasattr(settings, 'IGNORE_CELERY_INSPECTOR'):
active_task_queues = inspector.active()
else:
logger.warn("Ignoring celery task inspector")
active_task_queues = None
all_actions = sorted(jobs + project_updates + inventory_updates + system_jobs + ad_hoc_commands,
key=lambda task: task['created'])
return all_actions
active_tasks = set()
if active_task_queues is not None:
for queue in active_task_queues:
map(lambda at: active_tasks.add(at['id']), active_task_queues[queue])
else:
if not hasattr(settings, 'CELERY_UNIT_TEST'):
return (None, None)
return (active_task_queues, active_tasks)
# TODO: Consider a database query for this logic
def get_latest_project_update_tasks(self, all_sorted_tasks):
project_ids = Set()
for task in all_sorted_tasks:
if type(task) == JobDict:
project_ids.add(task['project_id'])
if isinstance(task, Job):
project_ids.add(task.project_id)
return ProjectUpdate.objects.filter(id__in=project_ids)
return ProjectUpdateLatestDict.filter_partial(list(project_ids))
# TODO: Consider a database query for this logic
def get_latest_inventory_update_tasks(self, all_sorted_tasks):
inventory_ids = Set()
for task in all_sorted_tasks:
if type(task) == JobDict:
inventory_ids.add(task['inventory_id'])
return InventoryUpdateLatestDict.filter_partial(list(inventory_ids))
if isinstance(task, Job):
inventory_ids.add(task.inventory_id)
return InventoryUpdate.objects.filter(id__in=inventory_ids)
def get_running_workflow_jobs(self):
graph_workflow_jobs = [wf for wf in
WorkflowJob.objects.filter(status='running')]
return graph_workflow_jobs
# TODO: Consider a database query for this logic
def get_inventory_source_tasks(self, all_sorted_tasks):
inventory_ids = Set()
results = []
for task in all_sorted_tasks:
if type(task) is JobDict:
inventory_ids.add(task['inventory_id'])
for inventory_id in inventory_ids:
results.append((inventory_id, InventorySourceDict.filter_partial(inventory_id)))
return results
if isinstance(task, Job):
inventory_ids.add(task.inventory_id)
return [invsrc for invsrc in InventorySource.objects.filter(inventory_id__in=inventory_ids)]
def spawn_workflow_graph_jobs(self, workflow_jobs):
for workflow_job in workflow_jobs:
@ -158,40 +175,21 @@ class TaskManager():
connection.on_commit(lambda: workflow_job.websocket_emit_status(workflow_job.status))
return result
def get_active_tasks(self):
inspector = inspect()
if not hasattr(settings, 'IGNORE_CELERY_INSPECTOR'):
active_task_queues = inspector.active()
else:
logger.warn("Ignoring celery task inspector")
active_task_queues = None
active_tasks = set()
if active_task_queues is not None:
for queue in active_task_queues:
map(lambda at: active_tasks.add(at['id']), active_task_queues[queue])
else:
if not hasattr(settings, 'CELERY_UNIT_TEST'):
return (None, None)
return (active_task_queues, active_tasks)
def get_dependent_jobs_for_inv_and_proj_update(self, job_obj):
return [{'type': j.model_to_str(), 'id': j.id} for j in job_obj.dependent_jobs.all()]
def start_task(self, task, dependent_tasks=[]):
def start_task(self, task, rampart_group, dependent_tasks=[]):
from awx.main.tasks import handle_work_error, handle_work_success
task_actual = {
'type':task.get_job_type_str(),
'id': task['id'],
'type':self.get_node_type(task),
'id': task.id,
}
dependencies = [{'type': t.get_job_type_str(), 'id': t['id']} for t in dependent_tasks]
dependencies = [{'type': self.get_node_type(t), 'id': t.id} for t in dependent_tasks]
error_handler = handle_work_error.s(subtasks=[task_actual] + dependencies)
success_handler = handle_work_success.s(task_actual=task_actual)
job_obj = task.get_full()
'''
This is to account for when there isn't enough capacity to execute all
dependent jobs (i.e. proj or inv update) within the same schedule()
@ -202,163 +200,176 @@ class TaskManager():
failure dependency.
'''
if len(dependencies) == 0:
dependencies = self.get_dependent_jobs_for_inv_and_proj_update(job_obj)
job_obj.status = 'waiting'
dependencies = self.get_dependent_jobs_for_inv_and_proj_update(task)
task.status = 'waiting'
(start_status, opts) = job_obj.pre_start()
(start_status, opts) = task.pre_start()
if not start_status:
job_obj.status = 'failed'
if job_obj.job_explanation:
job_obj.job_explanation += ' '
job_obj.job_explanation += 'Task failed pre-start check.'
job_obj.save()
task.status = 'failed'
if task.job_explanation:
task.job_explanation += ' '
task.job_explanation += 'Task failed pre-start check.'
task.save()
# TODO: run error handler to fail sub-tasks and send notifications
else:
if type(job_obj) is WorkflowJob:
job_obj.status = 'running'
if type(task) is WorkflowJob:
task.status = 'running'
task.instance_group = rampart_group
task.save()
job_obj.save()
self.consume_capacity(task)
self.consume_capacity(task, rampart_group.name)
def post_commit():
job_obj.websocket_emit_status(job_obj.status)
if job_obj.status != 'failed':
job_obj.start_celery_task(opts, error_callback=error_handler, success_callback=success_handler)
task.websocket_emit_status(task.status)
if task.status != 'failed':
task.start_celery_task(opts, error_callback=error_handler, success_callback=success_handler, queue=rampart_group.name)
connection.on_commit(post_commit)
def process_runnable_tasks(self, runnable_tasks):
map(lambda task: self.graph.add_job(task), runnable_tasks)
def process_running_tasks(self, running_tasks):
map(lambda task: self.graph[task.instance_group.name]['graph'].add_job(task), running_tasks)
def create_project_update(self, task):
dep = Project.objects.get(id=task['project_id']).create_project_update(launch_type='dependency')
project_task = Project.objects.get(id=task.project_id).create_project_update(launch_type='dependency')
# Project created 1 seconds behind
dep.created = task['created'] - timedelta(seconds=1)
dep.status = 'pending'
dep.save()
project_task = ProjectUpdateDict.get_partial(dep.id)
project_task.created = task.created - timedelta(seconds=1)
project_task.status = 'pending'
project_task.save()
return project_task
def create_inventory_update(self, task, inventory_source_task):
dep = InventorySource.objects.get(id=inventory_source_task['id']).create_inventory_update(launch_type='dependency')
dep.created = task['created'] - timedelta(seconds=2)
dep.status = 'pending'
dep.save()
inventory_task = InventoryUpdateDict.get_partial(dep.id)
'''
Update internal datastructures with the newly created inventory update
'''
# Should be only 1 inventory update. The one for the job (task)
latest_inventory_updates = self.get_latest_inventory_update_tasks([task])
self.process_latest_inventory_updates(latest_inventory_updates)
inventory_sources = self.get_inventory_source_tasks([task])
self.process_inventory_sources(inventory_sources)
self.graph.add_job(inventory_task)
inventory_task = InventorySource.objects.get(id=inventory_source_task.id).create_inventory_update(launch_type='inventory_taskendency')
inventory_task.created = task.created - timedelta(seconds=2)
inventory_task.status = 'pending'
inventory_task.save()
# inventory_sources = self.get_inventory_source_tasks([task])
# self.process_inventory_sources(inventory_sources)
return inventory_task
'''
Since we are dealing with partial objects we don't get to take advantage
of Django to resolve the type of related Many to Many field dependent_job.
Hence the, potentional, double query in this method.
'''
def get_related_dependent_jobs_as_patials(self, job_ids):
dependent_partial_jobs = []
for id in job_ids:
if ProjectUpdate.objects.filter(id=id).exists():
dependent_partial_jobs.append(ProjectUpdateDict({"id": id}).refresh_partial())
elif InventoryUpdate.objects.filter(id=id).exists():
dependent_partial_jobs.append(InventoryUpdateDict({"id": id}).refresh_partial())
return dependent_partial_jobs
def capture_chain_failure_dependencies(self, task, dependencies):
for dep in dependencies:
dep_obj = dep.get_full()
dep_obj.dependent_jobs.add(task['id'])
dep_obj.save()
'''
if not 'dependent_jobs__id' in task.data:
task.data['dependent_jobs__id'] = [dep_obj.data['id']]
else:
task.data['dependent_jobs__id'].append(dep_obj.data['id'])
'''
dep.dependent_jobs.add(task.id)
dep.save()
def should_update_inventory_source(self, job, inventory_source):
now = tz_now()
# Already processed dependencies for this job
if job.dependent_jobs.all():
return False
latest_inventory_update = InventoryUpdate.objects.filter(inventory_source=inventory_source).order_by("created")
if not latest_inventory_update.exists():
return True
latest_inventory_update = latest_inventory_update.first()
'''
If there's already a inventory update utilizing this job that's about to run
then we don't need to create one
'''
if latest_inventory_update.status in ['waiting', 'pending', 'running']:
return False
timeout_seconds = timedelta(seconds=latest_inventory_update.inventory_source.update_cache_timeout)
if (latest_inventory_update.finished + timeout_seconds) < now:
return True
if latest_inventory_update.inventory_source.update_on_launch is True and \
latest_inventory_update.status in ['failed', 'canceled', 'error']:
return True
return False
def should_update_related_project(self, job):
now = tz_now()
if job.dependent_jobs.all():
return False
latest_project_update = ProjectUpdate.objects.filter(project=job.project).order_by("created")
if not latest_project_update.exists():
return True
latest_project_update = latest_project_update.first()
if latest_project_update.status in ['failed', 'canceled']:
return True
'''
If there's already a project update utilizing this job that's about to run
then we don't need to create one
'''
if latest_project_update.status in ['waiting', 'pending', 'running']:
return False
'''
If the latest project update has a created time == job_created_time-1
then consider the project update found. This is so we don't enter an infinite loop
of updating the project when cache timeout is 0.
'''
if latest_project_update.project.scm_update_cache_timeout == 0 and \
latest_project_update.launch_type == 'dependency' and \
latest_project_update.created == job.created - timedelta(seconds=1):
return False
'''
Normal Cache Timeout Logic
'''
timeout_seconds = timedelta(seconds=latest_project_update.project.scm_update_cache_timeout)
if (latest_project_update.finished + timeout_seconds) < now:
return True
return False
def generate_dependencies(self, task):
dependencies = []
# TODO: What if the project is null ?
if type(task) is JobDict:
if type(task) is Job:
if task['project__scm_update_on_launch'] is True and \
self.graph.should_update_related_project(task):
if task.project.scm_update_on_launch is True and \
self.should_update_related_project(task):
project_task = self.create_project_update(task)
dependencies.append(project_task)
# Inventory created 2 seconds behind job
'''
Inventory may have already been synced from a provision callback.
'''
inventory_sources_already_updated = task.get_inventory_sources_already_updated()
'''
get_inventory_sources() only return update on launch sources
'''
for inventory_source_task in self.graph.get_inventory_sources(task['inventory_id']):
if inventory_source_task['id'] in inventory_sources_already_updated:
continue
if self.graph.should_update_related_inventory_source(task, inventory_source_task['id']):
inventory_task = self.create_inventory_update(task, inventory_source_task)
dependencies.append(inventory_task)
if task.launch_type != 'callback':
for inventory_source in [invsrc for invsrc in self.all_inventory_sources if invsrc.inventory == task.inventory]:
if self.should_update_inventory_source(task, inventory_source):
inventory_task = self.create_inventory_update(task, inventory_source)
dependencies.append(inventory_task)
self.capture_chain_failure_dependencies(task, dependencies)
return dependencies
def process_latest_project_updates(self, latest_project_updates):
map(lambda task: self.graph.add_latest_project_update(task), latest_project_updates)
def process_latest_inventory_updates(self, latest_inventory_updates):
map(lambda task: self.graph.add_latest_inventory_update(task), latest_inventory_updates)
def process_inventory_sources(self, inventory_id_sources):
map(lambda (inventory_id, inventory_sources): self.graph.add_inventory_sources(inventory_id, inventory_sources), inventory_id_sources)
def process_dependencies(self, dependent_task, dependency_tasks):
for task in dependency_tasks:
# ProjectUpdate or InventoryUpdate may be blocked by another of
# the same type.
if not self.graph.is_job_blocked(task):
self.graph.add_job(task)
if not self.would_exceed_capacity(task):
self.start_task(task, [dependent_task])
else:
self.graph.add_job(task)
if self.is_job_blocked(task):
logger.debug("Dependent task {} is blocked from running".format(task))
continue
preferred_instance_groups = task.preferred_instance_groups
found_acceptable_queue = False
for rampart_group in preferred_instance_groups:
if self.get_remaining_capacity(rampart_group.name) <= 0:
logger.debug("Skipping group {} capacity <= 0".format(rampart_group.name))
continue
if not self.would_exceed_capacity(task, rampart_group.name):
logger.debug("Starting dependent task {} in group {}".format(task, rampart_group.name))
self.graph[rampart_group.name]['graph'].add_job(task)
self.start_task(task, rampart_group, dependency_tasks)
found_acceptable_queue = True
if not found_acceptable_queue:
logger.debug("Dependent task {} couldn't be scheduled on graph, waiting for next cycle".format(task))
def process_pending_tasks(self, pending_tasks):
for task in pending_tasks:
# Stop processing tasks if we know we are out of capacity
if self.get_remaining_capacity() <= 0:
return
if not self.graph.is_job_blocked(task):
dependencies = self.generate_dependencies(task)
self.process_dependencies(task, dependencies)
# Spawning deps might have blocked us
if not self.graph.is_job_blocked(task):
self.graph.add_job(task)
if not self.would_exceed_capacity(task):
self.start_task(task)
else:
self.graph.add_job(task)
self.process_dependencies(task, self.generate_dependencies(task))
if self.is_job_blocked(task):
logger.debug("Task {} is blocked from running".format(task))
continue
preferred_instance_groups = task.preferred_instance_groups
found_acceptable_queue = False
for rampart_group in preferred_instance_groups:
if self.get_remaining_capacity(rampart_group.name) <= 0:
logger.debug("Skipping group {} capacity <= 0".format(rampart_group.name))
continue
if not self.would_exceed_capacity(task, rampart_group.name):
logger.debug("Starting task {} in group {}".format(task, rampart_group.name))
self.graph[rampart_group.name]['graph'].add_job(task)
self.start_task(task, rampart_group)
found_acceptable_queue = True
break
if not found_acceptable_queue:
logger.debug("Task {} couldn't be scheduled on graph, waiting for next cycle".format(task))
def process_celery_tasks(self, active_tasks, all_running_sorted_tasks):
'''
@ -366,66 +377,68 @@ class TaskManager():
'''
for task in all_running_sorted_tasks:
if (task['celery_task_id'] not in active_tasks and not hasattr(settings, 'IGNORE_CELERY_INSPECTOR')):
if (task.celery_task_id not in active_tasks and not hasattr(settings, 'IGNORE_CELERY_INSPECTOR')):
# TODO: try catch the getting of the job. The job COULD have been deleted
task_obj = task.get_full()
# Ensure job did not finish running between the time we get the
# list of task id's from celery and now.
# Note: This is an actual fix, not a reduction in the time
# window that this can happen.
if task_obj.status != 'running':
if task.status != 'running':
continue
task_obj.status = 'failed'
task_obj.job_explanation += ' '.join((
task.status = 'failed'
task.job_explanation += ' '.join((
'Task was marked as running in Tower but was not present in',
'Celery, so it has been marked as failed.',
))
task_obj.save()
_send_notification_templates(task_obj, 'failed')
task_obj.websocket_emit_status('failed')
task.save()
_send_notification_templates(task, 'failed')
task.websocket_emit_status('failed')
logger.error("Task %s appears orphaned... marking as failed" % task)
def calculate_capacity_used(self, tasks):
self.capacity_used = 0
for rampart_group in self.graph:
self.graph[rampart_group]['capacity_used'] = 0
for t in tasks:
self.capacity_used += t.task_impact()
for group_actual in InstanceGroup.objects.filter(instances__hostname=t.execution_node).values_list('name'):
if group_actual[0] in self.graph:
self.graph[group_actual[0]]['capacity_used'] += t.task_impact
def would_exceed_capacity(self, task):
if self.capacity_used == 0:
def would_exceed_capacity(self, task, instance_group):
current_capacity = self.graph[instance_group]['capacity_used']
capacity_total = self.graph[instance_group]['capacity_total']
if current_capacity == 0:
return False
return (task.task_impact() + self.capacity_used > self.capacity_total)
return (task.task_impact + current_capacity > capacity_total)
def consume_capacity(self, task):
self.capacity_used += task.task_impact()
def consume_capacity(self, task, instance_group):
self.graph[instance_group]['capacity_used'] += task.task_impact
def get_remaining_capacity(self):
return (self.capacity_total - self.capacity_used)
def get_remaining_capacity(self, instance_group):
return (self.graph[instance_group]['capacity_total'] - self.graph[instance_group]['capacity_used'])
def process_tasks(self, all_sorted_tasks):
running_tasks = filter(lambda t: t['status'] == 'running', all_sorted_tasks)
runnable_tasks = filter(lambda t: t['status'] in ['waiting', 'running'], all_sorted_tasks)
running_tasks = filter(lambda t: t.status in ['waiting', 'running'], all_sorted_tasks)
self.calculate_capacity_used(running_tasks)
self.process_runnable_tasks(runnable_tasks)
self.process_running_tasks(running_tasks)
pending_tasks = filter(lambda t: t['status'] in 'pending', all_sorted_tasks)
pending_tasks = filter(lambda t: t.status in 'pending', all_sorted_tasks)
self.process_pending_tasks(pending_tasks)
def _schedule(self):
finished_wfjs = []
all_sorted_tasks = self.get_tasks()
if len(all_sorted_tasks) > 0:
latest_project_updates = self.get_latest_project_update_tasks(all_sorted_tasks)
self.process_latest_project_updates(latest_project_updates)
# TODO: Deal with
# latest_project_updates = self.get_latest_project_update_tasks(all_sorted_tasks)
# self.process_latest_project_updates(latest_project_updates)
latest_inventory_updates = self.get_latest_inventory_update_tasks(all_sorted_tasks)
self.process_latest_inventory_updates(latest_inventory_updates)
# latest_inventory_updates = self.get_latest_inventory_update_tasks(all_sorted_tasks)
# self.process_latest_inventory_updates(latest_inventory_updates)
inventory_id_sources = self.get_inventory_source_tasks(all_sorted_tasks)
self.process_inventory_sources(inventory_id_sources)
self.all_inventory_sources = self.get_inventory_source_tasks(all_sorted_tasks)
running_workflow_tasks = self.get_running_workflow_jobs()
finished_wfjs = self.process_finished_workflow_jobs(running_workflow_tasks)
@ -436,6 +449,7 @@ class TaskManager():
return finished_wfjs
def schedule(self):
logger.debug("Starting Schedule")
with transaction.atomic():
# Lock
try:

View File

@ -5,7 +5,6 @@ from awx.main.models import (
InventoryUpdate,
ProjectUpdate,
WorkflowJob,
SystemJob,
)
@ -86,21 +85,6 @@ class SimpleDAG(object):
return idx
return None
def get_node_type(self, obj):
if type(obj) == Job:
return "job"
elif type(obj) == AdHocCommand:
return "ad_hoc_command"
elif type(obj) == InventoryUpdate:
return "inventory_update"
elif type(obj) == ProjectUpdate:
return "project_update"
elif type(obj) == SystemJob:
return "system_job"
elif type(obj) == WorkflowJob:
return "workflow_job"
return "unknown"
def get_dependencies(self, obj, label=None):
antecedents = []
this_ord = self.find_ord(obj)

View File

@ -1,13 +1,12 @@
from datetime import timedelta
from django.utils.timezone import now as tz_now
from awx.main.scheduler.partial import (
JobDict,
ProjectUpdateDict,
InventoryUpdateDict,
SystemJobDict,
AdHocCommandDict,
WorkflowJobDict,
from awx.main.models import (
Job,
ProjectUpdate,
InventoryUpdate,
SystemJob,
AdHocCommand,
WorkflowJob,
)
@ -28,7 +27,8 @@ class DependencyGraph(object):
INVENTORY_SOURCES = 'inventory_source_ids'
def __init__(self, *args, **kwargs):
def __init__(self, queue):
self.queue = queue
self.data = {}
# project_id -> True / False
self.data[self.PROJECT_UPDATES] = {}
@ -53,7 +53,7 @@ class DependencyGraph(object):
# workflow_job_template_id -> True / False
self.data[self.WORKFLOW_JOB_TEMPLATES_JOBS] = {}
# project_id -> latest ProjectUpdateLatestDict
# project_id -> latest ProjectUpdateLatestDict'
self.data[self.LATEST_PROJECT_UPDATES] = {}
# inventory_source_id -> latest InventoryUpdateLatestDict
self.data[self.LATEST_INVENTORY_UPDATES] = {}
@ -62,89 +62,16 @@ class DependencyGraph(object):
self.data[self.INVENTORY_SOURCES] = {}
def add_latest_project_update(self, job):
self.data[self.LATEST_PROJECT_UPDATES][job['project_id']] = job
def add_latest_inventory_update(self, job):
self.data[self.LATEST_INVENTORY_UPDATES][job['inventory_source_id']] = job
def add_inventory_sources(self, inventory_id, inventory_sources):
self.data[self.INVENTORY_SOURCES][inventory_id] = inventory_sources
def get_inventory_sources(self, inventory_id):
return self.data[self.INVENTORY_SOURCES].get(inventory_id, [])
self.data[self.LATEST_PROJECT_UPDATES][job.project_id] = job
def get_now(self):
return tz_now()
'''
JobDict
Presume that job is related to a project that is update on launch
'''
def should_update_related_project(self, job):
now = self.get_now()
# Already processed dependencies for this job
if job.data['dependent_jobs__id'] is not None:
return False
latest_project_update = self.data[self.LATEST_PROJECT_UPDATES].get(job['project_id'], None)
if not latest_project_update:
return True
# TODO: Other finished, failed cases? i.e. error ?
if latest_project_update['status'] in ['failed', 'canceled']:
return True
'''
This is a bit of fuzzy logic.
If the latest project update has a created time == job_created_time-1
then consider the project update found. This is so we don't enter an infinite loop
of updating the project when cache timeout is 0.
'''
if latest_project_update['project__scm_update_cache_timeout'] == 0 and \
latest_project_update['launch_type'] == 'dependency' and \
latest_project_update['created'] == job['created'] - timedelta(seconds=1):
return False
'''
Normal, expected, cache timeout logic
'''
timeout_seconds = timedelta(seconds=latest_project_update['project__scm_update_cache_timeout'])
if (latest_project_update['finished'] + timeout_seconds) < now:
return True
return False
def should_update_related_inventory_source(self, job, inventory_source_id):
now = self.get_now()
# Already processed dependencies for this job
if job.data['dependent_jobs__id'] is not None:
return False
latest_inventory_update = self.data[self.LATEST_INVENTORY_UPDATES].get(inventory_source_id, None)
if not latest_inventory_update:
return True
'''
Normal, expected, cache timeout logic
'''
timeout_seconds = timedelta(seconds=latest_inventory_update['inventory_source__update_cache_timeout'])
if (latest_inventory_update['finished'] + timeout_seconds) < now:
return True
if latest_inventory_update['inventory_source__update_on_launch'] is True and \
latest_inventory_update['status'] in ['failed', 'canceled', 'error']:
return True
return False
def mark_system_job(self):
self.data[self.SYSTEM_JOB] = False
def mark_project_update(self, job):
self.data[self.PROJECT_UPDATES][job['project_id']] = False
self.data[self.PROJECT_UPDATES][job.project_id] = False
def mark_inventory_update(self, inventory_id):
self.data[self.INVENTORY_UPDATES][inventory_id] = False
@ -153,69 +80,69 @@ class DependencyGraph(object):
self.data[self.INVENTORY_SOURCE_UPDATES][inventory_source_id] = False
def mark_job_template_job(self, job):
self.data[self.JOB_INVENTORY_IDS][job['inventory_id']] = False
self.data[self.JOB_PROJECT_IDS][job['project_id']] = False
self.data[self.JOB_TEMPLATE_JOBS][job['job_template_id']] = False
self.data[self.JOB_INVENTORY_IDS][job.inventory_id] = False
self.data[self.JOB_PROJECT_IDS][job.project_id] = False
self.data[self.JOB_TEMPLATE_JOBS][job.job_template_id] = False
def mark_workflow_job(self, job):
self.data[self.WORKFLOW_JOB_TEMPLATES_JOBS][job['workflow_job_template_id']] = False
self.data[self.WORKFLOW_JOB_TEMPLATES_JOBS][job.workflow_job_template_id] = False
def can_project_update_run(self, job):
return self.data[self.JOB_PROJECT_IDS].get(job['project_id'], True) and \
self.data[self.PROJECT_UPDATES].get(job['project_id'], True)
return self.data[self.JOB_PROJECT_IDS].get(job.project_id, True) and \
self.data[self.PROJECT_UPDATES].get(job.project_id, True)
def can_inventory_update_run(self, job):
return self.data[self.JOB_INVENTORY_IDS].get(job['inventory_source__inventory_id'], True) and \
self.data[self.INVENTORY_SOURCE_UPDATES].get(job['inventory_source_id'], True)
return self.data[self.JOB_INVENTORY_IDS].get(job.inventory_source.inventory_id, True) and \
self.data[self.INVENTORY_SOURCE_UPDATES].get(job.inventory_source_id, True)
def can_job_run(self, job):
if self.data[self.PROJECT_UPDATES].get(job['project_id'], True) is True and \
self.data[self.INVENTORY_UPDATES].get(job['inventory_id'], True) is True:
if job['allow_simultaneous'] is False:
return self.data[self.JOB_TEMPLATE_JOBS].get(job['job_template_id'], True)
if self.data[self.PROJECT_UPDATES].get(job.project_id, True) is True and \
self.data[self.INVENTORY_UPDATES].get(job.inventory_id, True) is True:
if job.allow_simultaneous is False:
return self.data[self.JOB_TEMPLATE_JOBS].get(job.job_template_id, True)
else:
return True
return False
def can_workflow_job_run(self, job):
if job['allow_simultaneous'] is True:
if job.allow_simultaneous:
return True
return self.data[self.WORKFLOW_JOB_TEMPLATES_JOBS].get(job['workflow_job_template_id'], True)
return self.data[self.WORKFLOW_JOB_TEMPLATES_JOBS].get(job.workflow_job_template_id, True)
def can_system_job_run(self):
return self.data[self.SYSTEM_JOB]
def can_ad_hoc_command_run(self, job):
return self.data[self.INVENTORY_UPDATES].get(job['inventory_id'], True)
return self.data[self.INVENTORY_UPDATES].get(job.inventory_id, True)
def is_job_blocked(self, job):
if type(job) is ProjectUpdateDict:
if type(job) is ProjectUpdate:
return not self.can_project_update_run(job)
elif type(job) is InventoryUpdateDict:
elif type(job) is InventoryUpdate:
return not self.can_inventory_update_run(job)
elif type(job) is JobDict:
elif type(job) is Job:
return not self.can_job_run(job)
elif type(job) is SystemJobDict:
elif type(job) is SystemJob:
return not self.can_system_job_run()
elif type(job) is AdHocCommandDict:
elif type(job) is AdHocCommand:
return not self.can_ad_hoc_command_run(job)
elif type(job) is WorkflowJobDict:
elif type(job) is WorkflowJob:
return not self.can_workflow_job_run(job)
def add_job(self, job):
if type(job) is ProjectUpdateDict:
if type(job) is ProjectUpdate:
self.mark_project_update(job)
elif type(job) is InventoryUpdateDict:
self.mark_inventory_update(job['inventory_source__inventory_id'])
self.mark_inventory_source_update(job['inventory_source_id'])
elif type(job) is JobDict:
elif type(job) is InventoryUpdate:
self.mark_inventory_update(job.inventory_source.inventory_id)
self.mark_inventory_source_update(job.inventory_source_id)
elif type(job) is Job:
self.mark_job_template_job(job)
elif type(job) is WorkflowJobDict:
elif type(job) is WorkflowJob:
self.mark_workflow_job(job)
elif type(job) is SystemJobDict:
elif type(job) is SystemJob:
self.mark_system_job()
elif type(job) is AdHocCommandDict:
self.mark_inventory_update(job['inventory_id'])
elif type(job) is AdHocCommand:
self.mark_inventory_update(job.inventory_id)
def add_jobs(self, jobs):
map(lambda j: self.add_job(j), jobs)

View File

@ -1,274 +0,0 @@
# Python
import json
import itertools
# AWX
from awx.main.utils import decrypt_field_value
from awx.main.models import (
Job,
ProjectUpdate,
InventoryUpdate,
InventorySource,
SystemJob,
AdHocCommand,
WorkflowJob,
)
class PartialModelDict(object):
FIELDS = ()
model = None
data = None
def __init__(self, data):
if type(data) is not dict:
raise RuntimeError("Expected data to be of type dict not %s" % type(data))
self.data = data
def __getitem__(self, index):
return self.data[index]
def __setitem__(self, key, value):
self.data[key] = value
def get(self, key, **kwargs):
return self.data.get(key, **kwargs)
def get_full(self):
return self.model.objects.get(id=self.data['id'])
def refresh_partial(self):
return self.__class__(self.model.objects.filter(id=self.data['id']).values(*self.__class__.get_db_values())[0])
@classmethod
def get_partial(cls, id):
return cls(cls.model.objects.filter(id=id).values(*cls.get_db_values())[0])
@classmethod
def get_db_values(cls):
return cls.FIELDS
@classmethod
def filter_partial(cls, status=[]):
kv = {
'status__in': status
}
return [cls(o) for o in cls.model.objects.filter(**kv).values(*cls.get_db_values())]
def get_job_type_str(self):
raise RuntimeError("Inherit and implement me")
def task_impact(self):
raise RuntimeError("Inherit and implement me")
@classmethod
def merge_values(cls, values):
grouped_results = itertools.groupby(values, key=lambda value: value['id'])
merged_values = []
for k, g in grouped_results:
groups = list(g)
merged_value = {}
for group in groups:
for key, val in group.iteritems():
if not merged_value.get(key):
merged_value[key] = val
elif val != merged_value[key]:
if isinstance(merged_value[key], list):
if val not in merged_value[key]:
merged_value[key].append(val)
else:
old_val = merged_value[key]
merged_value[key] = [old_val, val]
merged_values.append(merged_value)
return merged_values
class JobDict(PartialModelDict):
FIELDS = (
'id', 'status', 'job_template_id', 'inventory_id', 'project_id',
'launch_type', 'limit', 'allow_simultaneous', 'created',
'job_type', 'celery_task_id', 'project__scm_update_on_launch',
'forks', 'start_args', 'dependent_jobs__id',
)
model = Job
def get_job_type_str(self):
return 'job'
def task_impact(self):
return (5 if self.data['forks'] == 0 else self.data['forks']) * 10
def get_inventory_sources_already_updated(self):
try:
start_args = json.loads(decrypt_field_value(self.data['id'], 'start_args', self.data['start_args']))
except Exception:
return []
start_args = start_args or {}
return start_args.get('inventory_sources_already_updated', [])
@classmethod
def filter_partial(cls, status=[]):
kv = {
'status__in': status
}
merged = PartialModelDict.merge_values(cls.model.objects.filter(**kv).values(*cls.get_db_values()))
return [cls(o) for o in merged]
class ProjectUpdateDict(PartialModelDict):
FIELDS = (
'id', 'status', 'project_id', 'created', 'celery_task_id',
'launch_type', 'project__scm_update_cache_timeout',
'project__scm_update_on_launch',
)
model = ProjectUpdate
def get_job_type_str(self):
return 'project_update'
def task_impact(self):
return 10
@classmethod
def filter_partial(cls, status=[]):
kv = {
'status__in': status,
'job_type': 'check',
}
return [cls(o) for o in cls.model.objects.filter(**kv).values(*cls.get_db_values())]
class ProjectUpdateLatestDict(ProjectUpdateDict):
FIELDS = (
'id', 'status', 'project_id', 'created', 'finished',
'project__scm_update_cache_timeout',
'launch_type', 'project__scm_update_on_launch',
)
model = ProjectUpdate
@classmethod
def filter_partial(cls, project_ids):
# TODO: This can shurley be made more efficient
# * shouldn't have to do a query per inventory_id
# * shouldn't have to call .values() on all the results, only to get the first result
results = []
for project_id in project_ids:
qs = cls.model.objects.filter(project_id=project_id, status__in=['waiting', 'successful', 'failed']).order_by('-finished', '-started', '-created',)
if qs.count() > 0:
results.append(cls(cls.model.objects.filter(id=qs[0].id).values(*cls.get_db_values())[0]))
return results
class InventoryUpdateDict(PartialModelDict):
#'inventory_source__update_on_launch',
#'inventory_source__update_cache_timeout',
FIELDS = (
'id', 'status', 'created', 'celery_task_id', 'inventory_source_id',
'inventory_source__inventory_id',
)
model = InventoryUpdate
def get_job_type_str(self):
return 'inventory_update'
def task_impact(self):
return 20
class InventoryUpdateLatestDict(InventoryUpdateDict):
#'inventory_source__update_on_launch',
#'inventory_source__update_cache_timeout',
FIELDS = (
'id', 'status', 'created', 'celery_task_id', 'inventory_source_id',
'finished', 'inventory_source__update_cache_timeout', 'launch_type',
'inventory_source__update_on_launch',
)
model = InventoryUpdate
@classmethod
def filter_partial(cls, inventory_ids):
# TODO: This can shurley be made more efficient
# * shouldn't have to do a query per inventory_id nor per inventory_source_id
# * shouldn't have to call .values() on all the results, only to get the first result
results = []
for inventory_id in inventory_ids:
inventory_source_ids = InventorySource.objects.filter(inventory_id=inventory_id,
update_on_launch=True).values_list('id', flat=True)
# Find the most recent inventory update for each inventory source
for inventory_source_id in inventory_source_ids:
qs = cls.model.objects.filter(inventory_source_id=inventory_source_id,
status__in=['waiting', 'successful', 'failed'],
inventory_source__update_on_launch=True).order_by('-finished', '-started', '-created')
if qs.count() > 0:
results.append(cls(cls.model.objects.filter(id=qs[0].id).values(*cls.get_db_values())[0]))
return results
class InventorySourceDict(PartialModelDict):
FIELDS = (
'id',
)
model = InventorySource
def get_job_type_str(self):
return 'inventory_source'
def task_impact(self):
return 20
@classmethod
# TODO: Optimize this to run the query once
def filter_partial(cls, inventory_id):
kv = {
'inventory_id': inventory_id,
'update_on_launch': True,
}
return [cls(o) for o in cls.model.objects.filter(**kv).values(*cls.get_db_values())]
class SystemJobDict(PartialModelDict):
FIELDS = (
'id', 'created', 'status', 'celery_task_id',
)
model = SystemJob
def get_job_type_str(self):
return 'system_job'
def task_impact(self):
return 20
@classmethod
def filter_partial(cls, status=[]):
kv = {
'status__in': status
}
return [cls(o) for o in cls.model.objects.filter(**kv).values(*cls.get_db_values())]
class AdHocCommandDict(PartialModelDict):
FIELDS = (
'id', 'created', 'status', 'inventory_id', 'dependent_jobs__id', 'celery_task_id',
)
model = AdHocCommand
def get_job_type_str(self):
return 'ad_hoc_command'
def task_impact(self):
return 20
class WorkflowJobDict(PartialModelDict):
FIELDS = (
'id', 'created', 'status', 'workflow_job_template_id', 'allow_simultaneous',
)
model = WorkflowJob
def get_job_type_str(self):
return 'workflow_job'
def task_impact(self):
return 0

View File

@ -102,7 +102,7 @@ def _clear_cache_keys(set_of_keys):
cache.delete_many(set_of_keys)
@task(queue='broadcast_all')
@task(queue='tower_broadcast_all')
def process_cache_changes(cache_keys):
logger.warn('Processing cache changes, task args: {0.args!r} kwargs: {0.kwargs!r}'.format(
process_cache_changes.request))
@ -114,7 +114,7 @@ def process_cache_changes(cache_keys):
break
@task(queue='default')
@task(queue='tower')
def send_notifications(notification_list, job_id=None):
if not isinstance(notification_list, list):
raise TypeError("notification_list should be of type list")
@ -138,7 +138,7 @@ def send_notifications(notification_list, job_id=None):
notification.save()
@task(bind=True, queue='default')
@task(bind=True, queue='tower')
def run_administrative_checks(self):
logger.warn("Running administrative checks.")
if not settings.TOWER_ADMIN_ALERTS:
@ -160,7 +160,7 @@ def run_administrative_checks(self):
fail_silently=True)
@task(bind=True, queue='default')
@task(bind=True, queue='tower')
def cleanup_authtokens(self):
logger.warn("Cleaning up expired authtokens.")
AuthToken.objects.filter(expires__lt=now()).delete()
@ -201,7 +201,7 @@ def cluster_node_heartbeat(self):
@task(bind=True, queue='default')
@task(bind=True, queue='tower')
def tower_periodic_scheduler(self):
run_now = now()
state = TowerScheduleState.get_solo()
@ -251,7 +251,7 @@ def _send_notification_templates(instance, status_str):
job_id=instance.id)
@task(bind=True, queue='default')
@task(bind=True, queue='tower')
def handle_work_success(self, result, task_actual):
try:
instance = UnifiedJob.get_instance_by_type(task_actual['type'], task_actual['id'])
@ -267,7 +267,7 @@ def handle_work_success(self, result, task_actual):
run_job_complete.delay(instance.id)
@task(bind=True, queue='default')
@task(bind=True, queue='tower')
def handle_work_error(self, task_id, subtasks=None):
print('Executing error task id %s, subtasks: %s' %
(str(self.request.id), str(subtasks)))
@ -307,7 +307,7 @@ def handle_work_error(self, task_id, subtasks=None):
pass
@task(queue='default')
@task(queue='tower')
def update_inventory_computed_fields(inventory_id, should_update_hosts=True):
'''
Signal handler and wrapper around inventory.update_computed_fields to
@ -1140,6 +1140,7 @@ class RunJob(BaseTask):
_eager_fields=dict(
job_type='run',
status='running',
instance_group = job.instance_group,
celery_task_id=job_request_id))
# save the associated job before calling run() so that a
# cancel() call on the job can cancel the project update

View File

@ -6,6 +6,8 @@ import pytest
from awx.main.tests.factories import (
create_organization,
create_job_template,
create_instance,
create_instance_group,
create_notification_template,
create_survey_spec,
create_workflow_job_template,
@ -32,6 +34,21 @@ def survey_spec_factory():
return create_survey_spec
@pytest.fixture
def instance_factory():
return create_instance
@pytest.fixture
def instance_group_factory():
return create_instance_group
@pytest.fixture
def default_instance_group(instance_factory, instance_group_factory):
return create_instance_group("tower", instances=[create_instance("hostA")])
@pytest.fixture
def job_template_with_survey_passwords_factory(job_template_factory):
def rf(persisted):

View File

@ -1,4 +1,6 @@
from .tower import (
create_instance,
create_instance_group,
create_organization,
create_job_template,
create_notification_template,
@ -11,6 +13,8 @@ from .exc import (
)
__all__ = [
'create_instance',
'create_instance_group',
'create_organization',
'create_job_template',
'create_notification_template',

View File

@ -7,6 +7,7 @@ from awx.main.models import (
Project,
Team,
Instance,
InstanceGroup,
JobTemplate,
Job,
NotificationTemplate,
@ -27,11 +28,23 @@ from awx.main.models import (
#
def mk_instance(persisted=True):
def mk_instance(persisted=True, hostname='instance.example.org'):
if not persisted:
raise RuntimeError('creating an Instance requires persisted=True')
from django.conf import settings
return Instance.objects.get_or_create(uuid=settings.SYSTEM_UUID, hostname="instance.example.org")
return Instance.objects.get_or_create(uuid=settings.SYSTEM_UUID, hostname=hostname)[0]
def mk_instance_group(name='tower', instance=None):
ig, status = InstanceGroup.objects.get_or_create(name=name)
if instance is not None:
if type(instance) == list:
for i in instance:
ig.instances.add(i)
else:
ig.instances.add(instance)
ig.save()
return ig
def mk_organization(name, description=None, persisted=True):
@ -86,9 +99,11 @@ def mk_project(name, organization=None, description=None, persisted=True):
def mk_credential(name, credential_type='ssh', persisted=True):
type_ = CredentialType.defaults[credential_type]()
if persisted:
type_, status = CredentialType.objects.get_or_create(kind=credential_type)
type_.save()
else:
type_ = CredentialType.defaults[credential_type]()
cred = Credential(
credential_type=type_,
name=name

View File

@ -19,6 +19,8 @@ from .objects import (
)
from .fixtures import (
mk_instance,
mk_instance_group,
mk_organization,
mk_team,
mk_user,
@ -129,6 +131,14 @@ def generate_teams(organization, persisted, **kwargs):
return teams
def create_instance(name, instance_groups=None):
return mk_instance(hostname=name)
def create_instance_group(name, instances=None):
return mk_instance_group(name=name, instance=instances)
def create_survey_spec(variables=None, default_type='integer', required=True):
'''
Returns a valid survey spec for a job template, based on the input
@ -234,6 +244,8 @@ def create_job_template(name, roles=None, persisted=True, **kwargs):
if 'survey' in kwargs:
spec = create_survey_spec(kwargs['survey'])
else:
spec = None
jt = mk_job_template(name, project=proj,
inventory=inv, credential=cred,
@ -248,8 +260,9 @@ def create_job_template(name, roles=None, persisted=True, **kwargs):
else:
# Fill in default survey answers
job_extra_vars = {}
for question in spec['spec']:
job_extra_vars[question['variable']] = question['default']
if spec is not None:
for question in spec['spec']:
job_extra_vars[question['variable']] = question['default']
jobs[i] = mk_job(job_template=jt, project=proj, inventory=inv, credential=cred,
extra_vars=job_extra_vars,
job_type=job_type, persisted=persisted)

View File

@ -0,0 +1,150 @@
import pytest
import mock
from datetime import timedelta
from awx.main.scheduler import TaskManager
@pytest.mark.django_db
def test_multi_group_basic_job_launch(instance_factory, default_instance_group, mocker,
instance_group_factory, job_template_factory):
i1 = instance_factory("i1")
i2 = instance_factory("i2")
ig1 = instance_group_factory("ig1", instances=[i1])
ig2 = instance_group_factory("ig2", instances=[i2])
objects1 = job_template_factory('jt1', organization='org1', project='proj1',
inventory='inv1', credential='cred1',
jobs=["job_should_start"])
objects1.job_template.instance_groups.add(ig1)
j1 = objects1.jobs['job_should_start']
j1.status = 'pending'
j1.save()
objects2 = job_template_factory('jt2', organization='org2', project='proj2',
inventory='inv2', credential='cred2',
jobs=["job_should_still_start"])
objects2.job_template.instance_groups.add(ig2)
j2 = objects2.jobs['job_should_still_start']
j2.status = 'pending'
j2.save()
with mock.patch('awx.main.models.Job.task_impact', new_callable=mock.PropertyMock) as mock_task_impact:
mock_task_impact.return_value = 500
with mocker.patch("awx.main.scheduler.TaskManager.start_task"):
TaskManager().schedule()
TaskManager.start_task.assert_has_calls([mock.call(j1, ig1), mock.call(j2, ig2)])
@pytest.mark.django_db
def test_multi_group_with_shared_dependency(instance_factory, default_instance_group, mocker,
instance_group_factory, job_template_factory):
i1 = instance_factory("i1")
i2 = instance_factory("i2")
ig1 = instance_group_factory("ig1", instances=[i1])
ig2 = instance_group_factory("ig2", instances=[i2])
objects1 = job_template_factory('jt1', organization='org1', project='proj1',
inventory='inv1', credential='cred1',
jobs=["job_should_start"])
objects1.job_template.instance_groups.add(ig1)
p = objects1.project
p.scm_update_on_launch = True
p.scm_update_cache_timeout = 0
p.scm_type = "git"
p.scm_url = "http://github.com/ansible/ansible.git"
p.save()
j1 = objects1.jobs['job_should_start']
j1.status = 'pending'
j1.save()
objects2 = job_template_factory('jt2', organization=objects1.organization, project=p,
inventory='inv2', credential='cred2',
jobs=["job_should_still_start"])
objects2.job_template.instance_groups.add(ig2)
j2 = objects2.jobs['job_should_still_start']
j2.status = 'pending'
j2.save()
with mocker.patch("awx.main.scheduler.TaskManager.start_task"):
TaskManager().schedule()
pu = p.project_updates.first()
TaskManager.start_task.assert_called_once_with(pu, default_instance_group, [pu])
pu.finished = pu.created + timedelta(seconds=1)
pu.status = "successful"
pu.save()
with mock.patch("awx.main.scheduler.TaskManager.start_task"):
TaskManager().schedule()
TaskManager.start_task.assert_has_calls([mock.call(j1, ig1), mock.call(j2, ig2)])
@pytest.mark.django_db
def test_overcapacity_blocking_other_groups_unaffected(instance_factory, default_instance_group, mocker,
instance_group_factory, job_template_factory):
i1 = instance_factory("i1")
i1.capacity = 1000
i1.save()
i2 = instance_factory("i2")
ig1 = instance_group_factory("ig1", instances=[i1])
ig2 = instance_group_factory("ig2", instances=[i2])
objects1 = job_template_factory('jt1', organization='org1', project='proj1',
inventory='inv1', credential='cred1',
jobs=["job_should_start"])
objects1.job_template.instance_groups.add(ig1)
j1 = objects1.jobs['job_should_start']
j1.status = 'pending'
j1.save()
objects2 = job_template_factory('jt2', organization=objects1.organization, project='proj2',
inventory='inv2', credential='cred2',
jobs=["job_should_start", "job_should_also_start"])
objects2.job_template.instance_groups.add(ig1)
j1_1 = objects2.jobs['job_should_also_start']
j1_1.status = 'pending'
j1_1.save()
objects3 = job_template_factory('jt3', organization='org2', project='proj3',
inventory='inv3', credential='cred3',
jobs=["job_should_still_start"])
objects3.job_template.instance_groups.add(ig2)
j2 = objects3.jobs['job_should_still_start']
j2.status = 'pending'
j2.save()
objects4 = job_template_factory('jt4', organization=objects3.organization, project='proj4',
inventory='inv4', credential='cred4',
jobs=["job_should_not_start"])
objects4.job_template.instance_groups.add(ig2)
j2_1 = objects4.jobs['job_should_not_start']
j2_1.status = 'pending'
j2_1.save()
tm = TaskManager()
with mock.patch('awx.main.models.Job.task_impact', new_callable=mock.PropertyMock) as mock_task_impact:
mock_task_impact.return_value = 500
with mock.patch.object(TaskManager, "start_task", wraps=tm.start_task) as mock_job:
tm.schedule()
mock_job.assert_has_calls([mock.call(j1, ig1), mock.call(j1_1, ig1),
mock.call(j2, ig2)])
assert mock_job.call_count == 3
@pytest.mark.django_db
def test_failover_group_run(instance_factory, default_instance_group, mocker,
instance_group_factory, job_template_factory):
i1 = instance_factory("i1")
i2 = instance_factory("i2")
ig1 = instance_group_factory("ig1", instances=[i1])
ig2 = instance_group_factory("ig2", instances=[i2])
objects1 = job_template_factory('jt1', organization='org1', project='proj1',
inventory='inv1', credential='cred1',
jobs=["job_should_start"])
objects1.job_template.instance_groups.add(ig1)
j1 = objects1.jobs['job_should_start']
j1.status = 'pending'
j1.save()
objects2 = job_template_factory('jt2', organization=objects1.organization, project='proj2',
inventory='inv2', credential='cred2',
jobs=["job_should_start", "job_should_also_start"])
objects2.job_template.instance_groups.add(ig1)
objects2.job_template.instance_groups.add(ig2)
j1_1 = objects2.jobs['job_should_also_start']
j1_1.status = 'pending'
j1_1.save()
tm = TaskManager()
with mock.patch('awx.main.models.Job.task_impact', new_callable=mock.PropertyMock) as mock_task_impact:
mock_task_impact.return_value = 500
with mock.patch.object(TaskManager, "start_task", wraps=tm.start_task) as mock_job:
tm.schedule()
mock_job.assert_has_calls([mock.call(j1, ig1), mock.call(j1_1, ig2)])
assert mock_job.call_count == 2

View File

@ -0,0 +1,200 @@
import pytest
import mock
from datetime import timedelta
from awx.main.scheduler import TaskManager
@pytest.mark.django_db
def test_single_job_scheduler_launch(default_instance_group, job_template_factory, mocker):
objects = job_template_factory('jt', organization='org1', project='proj',
inventory='inv', credential='cred',
jobs=["job_should_start"])
j = objects.jobs["job_should_start"]
j.status = 'pending'
j.save()
with mocker.patch("awx.main.scheduler.TaskManager.start_task"):
TaskManager().schedule()
assert TaskManager.start_task.called
assert TaskManager.start_task.call_args == ((j, default_instance_group),)
@pytest.mark.django_db
def test_single_jt_multi_job_launch_blocks_last(default_instance_group, job_template_factory, mocker):
objects = job_template_factory('jt', organization='org1', project='proj',
inventory='inv', credential='cred',
jobs=["job_should_start", "job_should_not_start"])
j1 = objects.jobs["job_should_start"]
j1.status = 'pending'
j1.save()
j2 = objects.jobs["job_should_not_start"]
j2.status = 'pending'
j2.save()
with mock.patch("awx.main.scheduler.TaskManager.start_task"):
TaskManager().schedule()
TaskManager.start_task.assert_called_once_with(j1, default_instance_group)
j1.status = "successful"
j1.save()
with mocker.patch("awx.main.scheduler.TaskManager.start_task"):
TaskManager().schedule()
TaskManager.start_task.assert_called_once_with(j2, default_instance_group)
@pytest.mark.django_db
def test_single_jt_multi_job_launch_allow_simul_allowed(default_instance_group, job_template_factory, mocker):
objects = job_template_factory('jt', organization='org1', project='proj',
inventory='inv', credential='cred',
jobs=["job_should_start", "job_should_not_start"])
jt = objects.job_template
jt.save()
j1 = objects.jobs["job_should_start"]
j1.allow_simultaneous = True
j1.status = 'pending'
j1.save()
j2 = objects.jobs["job_should_not_start"]
j2.allow_simultaneous = True
j2.status = 'pending'
j2.save()
with mock.patch("awx.main.scheduler.TaskManager.start_task"):
TaskManager().schedule()
TaskManager.start_task.assert_has_calls([mock.call(j1, default_instance_group),
mock.call(j2, default_instance_group)])
@pytest.mark.django_db
def test_multi_jt_capacity_blocking(default_instance_group, job_template_factory, mocker):
objects1 = job_template_factory('jt1', organization='org1', project='proj1',
inventory='inv1', credential='cred1',
jobs=["job_should_start"])
objects2 = job_template_factory('jt2', organization='org2', project='proj2',
inventory='inv2', credential='cred2',
jobs=["job_should_not_start"])
j1 = objects1.jobs["job_should_start"]
j1.status = 'pending'
j1.save()
j2 = objects2.jobs["job_should_not_start"]
j2.status = 'pending'
j2.save()
tm = TaskManager()
with mock.patch('awx.main.models.Job.task_impact', new_callable=mock.PropertyMock) as mock_task_impact:
mock_task_impact.return_value = 500
with mock.patch.object(TaskManager, "start_task", wraps=tm.start_task) as mock_job:
tm.schedule()
mock_job.assert_called_once_with(j1, default_instance_group)
j1.status = "successful"
j1.save()
with mock.patch.object(TaskManager, "start_task", wraps=tm.start_task) as mock_job:
tm.schedule()
mock_job.assert_called_once_with(j2, default_instance_group)
@pytest.mark.django_db
def test_single_job_dependencies_project_launch(default_instance_group, job_template_factory, mocker):
objects = job_template_factory('jt', organization='org1', project='proj',
inventory='inv', credential='cred',
jobs=["job_should_start"])
j = objects.jobs["job_should_start"]
j.status = 'pending'
j.save()
p = objects.project
p.scm_update_on_launch = True
p.scm_update_cache_timeout = 0
p.scm_type = "git"
p.scm_url = "http://github.com/ansible/ansible.git"
p.save()
with mock.patch("awx.main.scheduler.TaskManager.start_task"):
tm = TaskManager()
with mock.patch.object(TaskManager, "create_project_update", wraps=tm.create_project_update) as mock_pu:
tm.schedule()
mock_pu.assert_called_once_with(j)
pu = [x for x in p.project_updates.all()]
assert len(pu) == 1
TaskManager.start_task.assert_called_once_with(pu[0], default_instance_group, [pu[0]])
pu[0].status = "successful"
pu[0].save()
with mock.patch("awx.main.scheduler.TaskManager.start_task"):
TaskManager().schedule()
TaskManager.start_task.assert_called_once_with(j, default_instance_group)
@pytest.mark.django_db
def test_single_job_dependencies_inventory_update_launch(default_instance_group, job_template_factory, mocker, inventory_source_factory):
objects = job_template_factory('jt', organization='org1', project='proj',
inventory='inv', credential='cred',
jobs=["job_should_start"])
j = objects.jobs["job_should_start"]
j.status = 'pending'
j.save()
i = objects.inventory
ii = inventory_source_factory("ec2")
ii.source = "ec2"
ii.update_on_launch = True
ii.update_cache_timeout = 0
ii.save()
i.inventory_sources.add(ii)
with mock.patch("awx.main.scheduler.TaskManager.start_task"):
tm = TaskManager()
with mock.patch.object(TaskManager, "create_inventory_update", wraps=tm.create_inventory_update) as mock_iu:
tm.schedule()
mock_iu.assert_called_once_with(j, ii)
iu = [x for x in ii.inventory_updates.all()]
assert len(iu) == 1
TaskManager.start_task.assert_called_once_with(iu[0], default_instance_group, [iu[0]])
iu[0].status = "successful"
iu[0].save()
with mock.patch("awx.main.scheduler.TaskManager.start_task"):
TaskManager().schedule()
TaskManager.start_task.assert_called_once_with(j, default_instance_group)
@pytest.mark.django_db
def test_shared_dependencies_launch(default_instance_group, job_template_factory, mocker, inventory_source_factory):
objects = job_template_factory('jt', organization='org1', project='proj',
inventory='inv', credential='cred',
jobs=["first_job", "second_job"])
j1 = objects.jobs["first_job"]
j1.status = 'pending'
j1.save()
j2 = objects.jobs["second_job"]
j2.status = 'pending'
j2.save()
p = objects.project
p.scm_update_on_launch = True
p.scm_update_cache_timeout = 300
p.scm_type = "git"
p.scm_url = "http://github.com/ansible/ansible.git"
p.save()
i = objects.inventory
ii = inventory_source_factory("ec2")
ii.source = "ec2"
ii.update_on_launch = True
ii.update_cache_timeout = 300
ii.save()
i.inventory_sources.add(ii)
with mock.patch("awx.main.scheduler.TaskManager.start_task"):
TaskManager().schedule()
pu = p.project_updates.first()
iu = ii.inventory_updates.first()
TaskManager.start_task.assert_has_calls([mock.call(pu, default_instance_group, [pu, iu]),
mock.call(iu, default_instance_group, [pu, iu])])
pu.status = "successful"
pu.finished = pu.created + timedelta(seconds=1)
pu.save()
iu.status = "successful"
iu.finished = iu.created + timedelta(seconds=1)
iu.save()
with mock.patch("awx.main.scheduler.TaskManager.start_task"):
TaskManager().schedule()
TaskManager.start_task.assert_called_once_with(j1, default_instance_group)
j1.status = "successful"
j1.save()
with mock.patch("awx.main.scheduler.TaskManager.start_task"):
TaskManager().schedule()
TaskManager.start_task.assert_called_once_with(j2, default_instance_group)
pu = [x for x in p.project_updates.all()]
iu = [x for x in ii.inventory_updates.all()]
assert len(pu) == 1
assert len(iu) == 1

View File

@ -0,0 +1,40 @@
import pytest
@pytest.mark.django_db
def test_default_tower_instance_group(default_instance_group, job_factory):
assert default_instance_group in job_factory().preferred_instance_groups
@pytest.mark.django_db
def test_basic_instance_group_membership(instance_group_factory, default_instance_group, job_factory):
j = job_factory()
ig = instance_group_factory("basicA", [default_instance_group.instances.first()])
j.job_template.instance_groups.add(ig)
assert ig in j.preferred_instance_groups
assert default_instance_group not in j.preferred_instance_groups
@pytest.mark.django_db
def test_inherited_instance_group_membership(instance_group_factory, default_instance_group, job_factory, project, inventory):
j = job_factory()
j.project = project
j.inventory = inventory
ig_org = instance_group_factory("basicA", [default_instance_group.instances.first()])
ig_inv = instance_group_factory("basicB", [default_instance_group.instances.first()])
j.project.organization.instance_groups.add(ig_org)
j.inventory.instance_groups.add(ig_inv)
assert ig_org in j.preferred_instance_groups
assert ig_inv in j.preferred_instance_groups
assert default_instance_group not in j.preferred_instance_groups
@pytest.mark.django_db
def test_instance_group_capacity(instance_factory, instance_group_factory):
i1 = instance_factory("i1")
i2 = instance_factory("i2")
i3 = instance_factory("i3")
ig_all = instance_group_factory("all", instances=[i1, i2, i3])
assert ig_all.capacity == 300
ig_single = instance_group_factory("single", instances=[i1])
assert ig_single.capacity == 100

View File

@ -0,0 +1,79 @@
import pytest
from awx.main.access import (
InstanceGroupAccess,
OrganizationAccess,
InventoryAccess,
JobTemplateAccess,
)
@pytest.mark.django_db
def test_ig_normal_user_visibility(organization, default_instance_group, user):
u = user('user', False)
assert len(InstanceGroupAccess(u).get_queryset()) == 0
organization.instance_groups.add(default_instance_group)
organization.member_role.members.add(u)
assert len(InstanceGroupAccess(u).get_queryset()) == 0
@pytest.mark.django_db
def test_ig_admin_user_visibility(organization, default_instance_group, admin, system_auditor, org_admin):
assert len(InstanceGroupAccess(admin).get_queryset()) == 1
assert len(InstanceGroupAccess(system_auditor).get_queryset()) == 1
assert len(InstanceGroupAccess(org_admin).get_queryset()) == 0
organization.instance_groups.add(default_instance_group)
assert len(InstanceGroupAccess(org_admin).get_queryset()) == 1
@pytest.mark.django_db
def test_ig_normal_user_associability(organization, default_instance_group, user):
u = user('user', False)
access = OrganizationAccess(u)
assert not access.can_attach(organization, default_instance_group, 'instance_groups', None)
organization.instance_groups.add(default_instance_group)
organization.member_role.members.add(u)
assert not access.can_attach(organization, default_instance_group, 'instance_groups', None)
@pytest.mark.django_db
def test_ig_associability(organization, default_instance_group, admin, system_auditor, org_admin, org_member, job_template_factory):
admin_access = OrganizationAccess(admin)
auditor_access = OrganizationAccess(system_auditor)
oadmin_access = OrganizationAccess(org_admin)
omember_access = OrganizationAccess(org_member)
assert admin_access.can_attach(organization, default_instance_group, 'instance_groups', None)
assert not oadmin_access.can_attach(organization, default_instance_group, 'instance_groups', None)
assert not auditor_access.can_attach(organization, default_instance_group, 'instance_groups', None)
assert not omember_access.can_attach(organization, default_instance_group, 'instance_groups', None)
organization.instance_groups.add(default_instance_group)
assert admin_access.can_unattach(organization, default_instance_group, 'instance_groups', None)
assert oadmin_access.can_unattach(organization, default_instance_group, 'instance_groups', None)
assert not auditor_access.can_unattach(organization, default_instance_group, 'instance_groups', None)
assert not omember_access.can_unattach(organization, default_instance_group, 'instance_groups', None)
objects = job_template_factory('jt', organization=organization, project='p',
inventory='i', credential='c')
admin_access = InventoryAccess(admin)
auditor_access = InventoryAccess(system_auditor)
oadmin_access = InventoryAccess(org_admin)
omember_access = InventoryAccess(org_member)
assert admin_access.can_attach(objects.inventory, default_instance_group, 'instance_groups', None)
assert oadmin_access.can_attach(objects.inventory, default_instance_group, 'instance_groups', None)
assert not auditor_access.can_attach(objects.inventory, default_instance_group, 'instance_groups', None)
assert not omember_access.can_attach(objects.inventory, default_instance_group, 'instance_groups', None)
admin_access = JobTemplateAccess(admin)
auditor_access = JobTemplateAccess(system_auditor)
oadmin_access = JobTemplateAccess(org_admin)
omember_access = JobTemplateAccess(org_member)
assert admin_access.can_attach(objects.job_template, default_instance_group, 'instance_groups', None)
assert oadmin_access.can_attach(objects.job_template, default_instance_group, 'instance_groups', None)
assert not auditor_access.can_attach(objects.job_template, default_instance_group, 'instance_groups', None)
assert not omember_access.can_attach(objects.job_template, default_instance_group, 'instance_groups', None)

View File

@ -11,7 +11,7 @@ from rest_framework.exceptions import PermissionDenied
# AWX
from awx.api.generics import (
ParentMixin,
SubListCreateAttachDetachAPIView,
SubListCreateAttachDetachAPIView, SubListAttachDetachAPIView,
DeleteLastUnattachLabelMixin,
ResourceAccessList
)
@ -140,6 +140,20 @@ class TestSubListCreateAttachDetachAPIView:
view.unattach_by_id.assert_not_called()
def test_attach_detatch_only(mocker):
mock_request = mocker.MagicMock()
mock_request.data = {'name': 'name for my new model'}
view = SubListAttachDetachAPIView()
view.model = mocker.MagicMock()
view.model._meta = mocker.MagicMock()
view.model._meta.verbose_name = "Foo Bar"
resp = view.post(mock_request)
assert 'Foo Bar' in resp.data['msg']
assert 'field is missing' in resp.data['msg']
class TestDeleteLastUnattachLabelMixin:
@mock.patch('__builtin__.super')
def test_unattach_ok(self, super, mocker):

View File

@ -1,265 +0,0 @@
# Python
import pytest
from datetime import timedelta
# Django
from django.utils.timezone import now as tz_now
# awx
from awx.main.scheduler.partial import (
JobDict,
ProjectUpdateDict,
InventoryUpdateDict,
InventorySourceDict,
)
from awx.main.scheduler import TaskManager
@pytest.fixture
def epoch():
return tz_now()
@pytest.fixture
def scheduler_factory(mocker, epoch):
mocker.patch('awx.main.models.Instance.objects.total_capacity', return_value=10000)
def fn(tasks=[], inventory_sources=[], latest_project_updates=[], latest_inventory_updates=[], create_project_update=None, create_inventory_update=None):
sched = TaskManager()
sched.graph.get_now = lambda: epoch
def no_create_inventory_update(task, ignore):
raise RuntimeError("create_inventory_update should not be called")
def no_create_project_update(task):
raise RuntimeError("create_project_update should not be called")
mocker.patch.object(sched, 'capture_chain_failure_dependencies')
mocker.patch.object(sched, 'get_tasks', return_value=tasks)
mocker.patch.object(sched, 'get_running_workflow_jobs', return_value=[])
mocker.patch.object(sched, 'get_inventory_source_tasks', return_value=inventory_sources)
mocker.patch.object(sched, 'get_latest_project_update_tasks', return_value=latest_project_updates)
mocker.patch.object(sched, 'get_latest_inventory_update_tasks', return_value=latest_inventory_updates)
create_project_update_mock = mocker.patch.object(sched, 'create_project_update', return_value=create_project_update)
create_inventory_update_mock = mocker.patch.object(sched, 'create_inventory_update', return_value=create_inventory_update)
mocker.patch.object(sched, 'start_task')
if not create_project_update:
create_project_update_mock.side_effect = no_create_project_update
if not create_inventory_update:
create_inventory_update_mock.side_effect = no_create_inventory_update
return sched
return fn
@pytest.fixture
def project_update_factory(epoch):
def fn():
return ProjectUpdateDict({
'id': 1,
'created': epoch - timedelta(seconds=100),
'project_id': 1,
'project__scm_update_cache_timeout': 0,
'celery_task_id': '',
'launch_type': 'dependency',
'project__scm_update_on_launch': True,
})
return fn
@pytest.fixture
def pending_project_update(project_update_factory):
project_update = project_update_factory()
project_update['status'] = 'pending'
return project_update
@pytest.fixture
def waiting_project_update(epoch, project_update_factory):
project_update = project_update_factory()
project_update['status'] = 'waiting'
return project_update
@pytest.fixture
def running_project_update(epoch, project_update_factory):
project_update = project_update_factory()
project_update['status'] = 'running'
return project_update
@pytest.fixture
def successful_project_update(epoch, project_update_factory):
project_update = project_update_factory()
project_update['finished'] = epoch - timedelta(seconds=90)
project_update['status'] = 'successful'
return project_update
@pytest.fixture
def successful_project_update_cache_expired(epoch, project_update_factory):
project_update = project_update_factory()
project_update['status'] = 'successful'
project_update['created'] = epoch - timedelta(seconds=120)
project_update['finished'] = epoch - timedelta(seconds=110)
project_update['project__scm_update_cache_timeout'] = 1
return project_update
@pytest.fixture
def failed_project_update(epoch, project_update_factory):
project_update = project_update_factory()
project_update['finished'] = epoch - timedelta(seconds=90)
project_update['status'] = 'failed'
return project_update
@pytest.fixture
def inventory_update_factory(epoch):
def fn():
return InventoryUpdateDict({
'id': 1,
'created': epoch - timedelta(seconds=101),
'inventory_id': 1,
'celery_task_id': '',
'status': 'pending',
'launch_type': 'dependency',
'inventory_source_id': 1,
'inventory_source__inventory_id': 1,
})
return fn
@pytest.fixture
def inventory_update_latest_factory(epoch):
def fn():
return InventoryUpdateDict({
'id': 1,
'created': epoch - timedelta(seconds=101),
'inventory_id': 1,
'celery_task_id': '',
'status': 'pending',
'launch_type': 'dependency',
'inventory_source_id': 1,
'finished': None,
})
return fn
@pytest.fixture
def inventory_update_latest(inventory_update_latest_factory):
return inventory_update_latest_factory()
@pytest.fixture
def successful_inventory_update_latest(inventory_update_latest_factory):
iu = inventory_update_latest_factory()
iu['status'] = 'successful'
iu['finished'] = iu['created'] + timedelta(seconds=10)
return iu
@pytest.fixture
def failed_inventory_update_latest(inventory_update_latest_factory):
iu = inventory_update_latest_factory()
iu['status'] = 'failed'
return iu
@pytest.fixture
def pending_inventory_update(epoch, inventory_update_factory):
inventory_update = inventory_update_factory()
inventory_update['status'] = 'pending'
return inventory_update
@pytest.fixture
def waiting_inventory_update(epoch, inventory_update_factory):
inventory_update = inventory_update_factory()
inventory_update['status'] = 'waiting'
return inventory_update
@pytest.fixture
def failed_inventory_update(epoch, inventory_update_factory):
inventory_update = inventory_update_factory()
inventory_update['status'] = 'failed'
return inventory_update
@pytest.fixture
def running_inventory_update(epoch, inventory_update_factory):
inventory_update = inventory_update_factory()
inventory_update['status'] = 'running'
return inventory_update
@pytest.fixture
def successful_inventory_update(epoch, inventory_update_factory):
inventory_update = inventory_update_factory()
inventory_update['finished'] = epoch - timedelta(seconds=90)
inventory_update['status'] = 'successful'
return inventory_update
@pytest.fixture
def job_factory(epoch):
'''
Job
'''
def fn(id=1, project__scm_update_on_launch=True, inventory__inventory_sources=[], allow_simultaneous=False):
return JobDict({
'id': id,
'status': 'pending',
'job_template_id': 1,
'project_id': 1,
'inventory_id': 1,
'launch_type': 'manual',
'allow_simultaneous': allow_simultaneous,
'created': epoch - timedelta(seconds=99),
'celery_task_id': '',
'project__scm_update_on_launch': project__scm_update_on_launch,
'inventory__inventory_sources': inventory__inventory_sources,
'forks': 5,
'dependent_jobs__id': None,
})
return fn
@pytest.fixture
def pending_job(job_factory):
job = job_factory()
job['status'] = 'pending'
return job
@pytest.fixture
def running_job(job_factory):
job = job_factory()
job['status'] = 'running'
return job
@pytest.fixture
def inventory_source_factory():
'''
Inventory id -> [InventorySourceDict, ...]
'''
def fn(id=1):
return InventorySourceDict({
'id': id,
})
return fn
@pytest.fixture
def inventory_id_sources(inventory_source_factory):
return [
(1, [
inventory_source_factory(id=1),
inventory_source_factory(id=2),
]),
]

View File

@ -1,198 +0,0 @@
# Python
import pytest
# AWX
from awx.main.scheduler.dag_simple import SimpleDAG
from awx.main.scheduler.dag_workflow import WorkflowDAG
from awx.main.models import Job, JobTemplate
from awx.main.models.workflow import WorkflowJobNode
@pytest.fixture
def dag_root():
dag = SimpleDAG()
data = [
{1: 1},
{2: 2},
{3: 3},
{4: 4},
{5: 5},
{6: 6},
]
# Add all the nodes to the DAG
[dag.add_node(d) for d in data]
dag.add_edge(data[0], data[1])
dag.add_edge(data[2], data[3])
dag.add_edge(data[4], data[5])
return dag
@pytest.fixture
def dag_simple_edge_labels():
dag = SimpleDAG()
data = [
{1: 1},
{2: 2},
{3: 3},
{4: 4},
{5: 5},
{6: 6},
]
# Add all the nodes to the DAG
[dag.add_node(d) for d in data]
dag.add_edge(data[0], data[1], 'one')
dag.add_edge(data[2], data[3], 'two')
dag.add_edge(data[4], data[5], 'three')
return dag
'''
class TestSimpleDAG(object):
def test_get_root_nodes(self, dag_root):
leafs = dag_root.get_leaf_nodes()
roots = dag_root.get_root_nodes()
def test_get_labeled_edges(self, dag_simple_edge_labels):
dag = dag_simple_edge_labels
nodes = dag.get_dependencies(dag.nodes[0]['node_object'], 'one')
nodes = dag.get_dependencies(dag.nodes[0]['node_object'], 'two')
'''
@pytest.fixture
def factory_node():
def fn(id, status):
wfn = WorkflowJobNode(id=id)
if status:
j = Job(status=status)
wfn.job = j
wfn.unified_job_template = JobTemplate(name='JT{}'.format(id))
return wfn
return fn
@pytest.fixture
def workflow_dag_level_2(factory_node):
dag = WorkflowDAG()
data = [
factory_node(0, 'successful'),
factory_node(1, 'successful'),
factory_node(2, 'successful'),
factory_node(3, None),
factory_node(4, None),
factory_node(5, None),
]
[dag.add_node(d) for d in data]
dag.add_edge(data[0], data[3], 'success_nodes')
dag.add_edge(data[1], data[4], 'success_nodes')
dag.add_edge(data[2], data[5], 'success_nodes')
return (dag, data[3:6], False)
@pytest.fixture
def workflow_dag_multiple_roots(factory_node):
dag = WorkflowDAG()
data = [
factory_node(1, None),
factory_node(2, None),
factory_node(3, None),
factory_node(4, None),
factory_node(5, None),
factory_node(6, None),
]
[dag.add_node(d) for d in data]
dag.add_edge(data[0], data[3], 'success_nodes')
dag.add_edge(data[1], data[4], 'success_nodes')
dag.add_edge(data[2], data[5], 'success_nodes')
expected = data[0:3]
return (dag, expected, False)
@pytest.fixture
def workflow_dag_multiple_edges_labeled(factory_node):
dag = WorkflowDAG()
data = [
factory_node(0, 'failed'),
factory_node(1, None),
factory_node(2, 'failed'),
factory_node(3, None),
factory_node(4, 'failed'),
factory_node(5, None),
]
[dag.add_node(d) for d in data]
dag.add_edge(data[0], data[1], 'success_nodes')
dag.add_edge(data[0], data[2], 'failure_nodes')
dag.add_edge(data[2], data[3], 'success_nodes')
dag.add_edge(data[2], data[4], 'failure_nodes')
dag.add_edge(data[4], data[5], 'failure_nodes')
expected = data[5:6]
return (dag, expected, False)
@pytest.fixture
def workflow_dag_finished(factory_node):
dag = WorkflowDAG()
data = [
factory_node(0, 'failed'),
factory_node(1, None),
factory_node(2, 'failed'),
factory_node(3, None),
factory_node(4, 'failed'),
factory_node(5, 'successful'),
]
[dag.add_node(d) for d in data]
dag.add_edge(data[0], data[1], 'success_nodes')
dag.add_edge(data[0], data[2], 'failure_nodes')
dag.add_edge(data[2], data[3], 'success_nodes')
dag.add_edge(data[2], data[4], 'failure_nodes')
dag.add_edge(data[4], data[5], 'failure_nodes')
expected = []
return (dag, expected, True)
@pytest.fixture
def workflow_dag_always(factory_node):
dag = WorkflowDAG()
data = [
factory_node(0, 'failed'),
factory_node(1, 'successful'),
factory_node(2, None),
]
[dag.add_node(d) for d in data]
dag.add_edge(data[0], data[1], 'always_nodes')
dag.add_edge(data[1], data[2], 'always_nodes')
expected = data[2:3]
return (dag, expected, False)
@pytest.fixture(params=['workflow_dag_multiple_roots', 'workflow_dag_level_2',
'workflow_dag_multiple_edges_labeled', 'workflow_dag_finished',
'workflow_dag_always'])
def workflow_dag(request):
return request.getfuncargvalue(request.param)
class TestWorkflowDAG():
def test_bfs_nodes_to_run(self, workflow_dag):
dag, expected, is_done = workflow_dag
assert dag.bfs_nodes_to_run() == expected
def test_is_workflow_done(self, workflow_dag):
dag, expected, is_done = workflow_dag
assert dag.is_workflow_done() == is_done

View File

@ -1,121 +0,0 @@
# Python
import pytest
from datetime import timedelta
# Django
from django.utils.timezone import now as tz_now
# AWX
from awx.main.scheduler.dependency_graph import DependencyGraph
from awx.main.scheduler.partial import ProjectUpdateDict
@pytest.fixture
def graph():
return DependencyGraph()
@pytest.fixture
def job(job_factory):
j = job_factory()
j.project_id = 1
return j
@pytest.fixture
def unsuccessful_last_project(graph, job):
pu = ProjectUpdateDict(dict(id=1,
project__scm_update_cache_timeout=999999,
project_id=1,
status='failed',
created='3',
finished='3',))
graph.add_latest_project_update(pu)
return graph
@pytest.fixture
def last_dependent_project(graph, job):
now = tz_now()
job['project_id'] = 1
job['created'] = now
pu = ProjectUpdateDict(dict(id=1, project_id=1, status='waiting',
project__scm_update_cache_timeout=0,
launch_type='dependency',
created=now - timedelta(seconds=1),))
graph.add_latest_project_update(pu)
return (graph, job)
@pytest.fixture
def timedout_project_update(graph, job):
now = tz_now()
job['project_id'] = 1
job['created'] = now
pu = ProjectUpdateDict(dict(id=1, project_id=1, status='successful',
project__scm_update_cache_timeout=10,
launch_type='dependency',
created=now - timedelta(seconds=100),
finished=now - timedelta(seconds=11),))
graph.add_latest_project_update(pu)
return (graph, job)
@pytest.fixture
def not_timedout_project_update(graph, job):
now = tz_now()
job['project_id'] = 1
job['created'] = now
pu = ProjectUpdateDict(dict(id=1, project_id=1, status='successful',
project__scm_update_cache_timeout=3600,
launch_type='dependency',
created=now - timedelta(seconds=100),
finished=now - timedelta(seconds=11),))
graph.add_latest_project_update(pu)
return (graph, job)
class TestShouldUpdateRelatedProject():
def test_no_project_updates(self, graph, job):
actual = graph.should_update_related_project(job)
assert True is actual
def test_timedout_project_update(self, timedout_project_update):
(graph, job) = timedout_project_update
actual = graph.should_update_related_project(job)
assert True is actual
def test_not_timedout_project_update(self, not_timedout_project_update):
(graph, job) = not_timedout_project_update
actual = graph.should_update_related_project(job)
assert False is actual
def test_unsuccessful_last_project(self, unsuccessful_last_project, job):
graph = unsuccessful_last_project
actual = graph.should_update_related_project(job)
assert True is actual
def test_last_dependent_project(self, last_dependent_project):
(graph, job) = last_dependent_project
actual = graph.should_update_related_project(job)
assert False is actual

View File

@ -1,132 +0,0 @@
# Python
import pytest
from datetime import timedelta
@pytest.fixture
def pending_job(job_factory):
return job_factory(project__scm_update_on_launch=False, inventory__inventory_sources=['1'])
@pytest.fixture
def successful_inventory_update_latest(inventory_update_latest_factory):
iu = inventory_update_latest_factory()
iu['inventory_source__update_cache_timeout'] = 100
iu['status'] = 'successful'
iu['finished'] = iu['created'] + timedelta(seconds=10)
return iu
@pytest.fixture
def successful_inventory_update_latest_cache_expired(inventory_update_latest_factory):
iu = inventory_update_latest_factory()
iu['inventory_source__update_cache_timeout'] = 1
iu['finished'] = iu['created'] + timedelta(seconds=2)
return iu
@pytest.fixture
def failed_inventory_update_latest_cache_zero(failed_inventory_update_latest):
iu = failed_inventory_update_latest
iu['inventory_source__update_cache_timeout'] = 0
iu['inventory_source__update_on_launch'] = True
iu['finished'] = iu['created'] + timedelta(seconds=2)
iu['status'] = 'failed'
return iu
@pytest.fixture
def failed_inventory_update_latest_cache_non_zero(failed_inventory_update_latest_cache_zero):
failed_inventory_update_latest_cache_zero['inventory_source__update_cache_timeout'] = 10000000
return failed_inventory_update_latest_cache_zero
class TestStartInventoryUpdate():
def test_pending(self, scheduler_factory, pending_inventory_update):
scheduler = scheduler_factory(tasks=[pending_inventory_update])
scheduler._schedule()
scheduler.start_task.assert_called_with(pending_inventory_update)
class TestInventoryUpdateBlocked():
def test_running_inventory_update(self, epoch, scheduler_factory, running_inventory_update, pending_inventory_update):
running_inventory_update['created'] = epoch - timedelta(seconds=100)
pending_inventory_update['created'] = epoch - timedelta(seconds=90)
scheduler = scheduler_factory(tasks=[running_inventory_update, pending_inventory_update])
scheduler._schedule()
def test_waiting_inventory_update(self, epoch, scheduler_factory, waiting_inventory_update, pending_inventory_update):
waiting_inventory_update['created'] = epoch - timedelta(seconds=100)
pending_inventory_update['created'] = epoch - timedelta(seconds=90)
scheduler = scheduler_factory(tasks=[waiting_inventory_update, pending_inventory_update])
scheduler._schedule()
class TestCreateDependentInventoryUpdate():
def test(self, scheduler_factory, pending_job, waiting_inventory_update, inventory_id_sources):
scheduler = scheduler_factory(tasks=[pending_job],
create_inventory_update=waiting_inventory_update,
inventory_sources=inventory_id_sources)
scheduler._schedule()
scheduler.start_task.assert_called_with(waiting_inventory_update, [pending_job])
def test_cache_hit(self, scheduler_factory, pending_job, successful_inventory_update, successful_inventory_update_latest):
scheduler = scheduler_factory(tasks=[successful_inventory_update, pending_job],
latest_inventory_updates=[successful_inventory_update_latest])
scheduler._schedule()
scheduler.start_task.assert_called_with(pending_job)
def test_cache_miss(self, scheduler_factory, pending_job, successful_inventory_update, successful_inventory_update_latest_cache_expired, waiting_inventory_update, inventory_id_sources):
scheduler = scheduler_factory(tasks=[successful_inventory_update, pending_job],
latest_inventory_updates=[successful_inventory_update_latest_cache_expired],
create_inventory_update=waiting_inventory_update,
inventory_sources=inventory_id_sources)
scheduler._schedule()
scheduler.start_task.assert_called_with(waiting_inventory_update, [pending_job])
def test_last_update_timeout_zero_failed(self, scheduler_factory, pending_job, failed_inventory_update, failed_inventory_update_latest_cache_zero, waiting_inventory_update, inventory_id_sources):
scheduler = scheduler_factory(tasks=[failed_inventory_update, pending_job],
latest_inventory_updates=[failed_inventory_update_latest_cache_zero],
create_inventory_update=waiting_inventory_update,
inventory_sources=inventory_id_sources)
scheduler._schedule()
scheduler.start_task.assert_called_with(waiting_inventory_update, [pending_job])
def test_last_update_timeout_non_zero_failed(self, scheduler_factory, pending_job, failed_inventory_update, failed_inventory_update_latest_cache_non_zero, waiting_inventory_update, inventory_id_sources):
scheduler = scheduler_factory(tasks=[failed_inventory_update, pending_job],
latest_inventory_updates=[failed_inventory_update_latest_cache_non_zero],
create_inventory_update=waiting_inventory_update,
inventory_sources=inventory_id_sources)
scheduler._schedule()
scheduler.start_task.assert_called_with(waiting_inventory_update, [pending_job])
class TestCaptureChainFailureDependencies():
@pytest.fixture
def inventory_id_sources(self, inventory_source_factory):
return [
(1, [inventory_source_factory(id=1)]),
]
def test(self, scheduler_factory, pending_job, waiting_inventory_update, inventory_id_sources):
scheduler = scheduler_factory(tasks=[pending_job],
create_inventory_update=waiting_inventory_update,
inventory_sources=inventory_id_sources)
scheduler._schedule()
scheduler.capture_chain_failure_dependencies.assert_called_with(pending_job, [waiting_inventory_update])

View File

@ -1,86 +0,0 @@
# Python
import pytest
from datetime import timedelta
class TestJobBlocked():
def test_inventory_update_waiting(self, scheduler_factory, waiting_inventory_update, pending_job):
scheduler = scheduler_factory(tasks=[waiting_inventory_update, pending_job])
scheduler._schedule()
scheduler.start_task.assert_not_called()
def test_inventory_update_running(self, scheduler_factory, running_inventory_update, pending_job, inventory_source_factory, inventory_id_sources):
scheduler = scheduler_factory(tasks=[running_inventory_update, pending_job],
inventory_sources=inventory_id_sources)
scheduler._schedule()
scheduler.start_task.assert_not_called()
def test_project_update_running(self, scheduler_factory, pending_job, running_project_update):
scheduler = scheduler_factory(tasks=[running_project_update, pending_job])
scheduler._schedule()
scheduler.start_task.assert_not_called()
assert scheduler.create_project_update.call_count == 0
def test_project_update_waiting(self, scheduler_factory, pending_job, waiting_project_update):
scheduler = scheduler_factory(tasks=[waiting_project_update, pending_job],
latest_project_updates=[waiting_project_update])
scheduler._schedule()
scheduler.start_task.assert_not_called()
assert scheduler.create_project_update.call_count == 0
class TestJob():
@pytest.fixture
def successful_project_update(self, project_update_factory):
project_update = project_update_factory()
project_update['status'] = 'successful'
project_update['finished'] = project_update['created'] + timedelta(seconds=10)
project_update['project__scm_update_cache_timeout'] = 3600
return project_update
def test_existing_dependencies_finished(self, scheduler_factory, successful_project_update, successful_inventory_update_latest, pending_job):
scheduler = scheduler_factory(tasks=[successful_project_update, pending_job],
latest_project_updates=[successful_project_update],
latest_inventory_updates=[successful_inventory_update_latest])
scheduler._schedule()
scheduler.start_task.assert_called_with(pending_job)
class TestCapacity():
@pytest.fixture
def pending_job_high_impact(self, mocker, job_factory):
pending_job = job_factory(project__scm_update_on_launch=False)
mocker.patch.object(pending_job, 'task_impact', return_value=10001)
return pending_job
def test_no_capacity(self, scheduler_factory, pending_job_high_impact):
scheduler = scheduler_factory(tasks=[pending_job_high_impact])
scheduler._schedule()
scheduler.start_task.assert_called_with(pending_job_high_impact)
@pytest.fixture
def pending_jobs_impactful(self, mocker, job_factory):
pending_jobs = [job_factory(id=i + 1, project__scm_update_on_launch=False, allow_simultaneous=True) for i in xrange(0, 3)]
map(lambda pending_job: mocker.patch.object(pending_job, 'task_impact', return_value=10), pending_jobs)
return pending_jobs
def test_capacity_exhausted(self, mocker, scheduler_factory, pending_jobs_impactful):
scheduler = scheduler_factory(tasks=pending_jobs_impactful)
scheduler._schedule()
calls = [mocker.call(job) for job in pending_jobs_impactful]
scheduler.start_task.assert_has_calls(calls)

View File

@ -1,75 +0,0 @@
# TODO: wherever get_latest_rpoject_update_task() is stubbed and returns a
# ProjectUpdateDict. We should instead return a ProjectUpdateLatestDict()
# For now, this is ok since the fields on deviate that much.
class TestStartProjectUpdate():
def test(self, scheduler_factory, pending_project_update):
scheduler = scheduler_factory(tasks=[pending_project_update])
scheduler._schedule()
scheduler.start_task.assert_called_with(pending_project_update)
assert scheduler.create_project_update.call_count == 0
'''
Explicit project update should always run. They should not use cache logic.
'''
def test_cache_oblivious(self, scheduler_factory, successful_project_update, pending_project_update):
scheduler = scheduler_factory(tasks=[pending_project_update],
latest_project_updates=[successful_project_update])
scheduler._schedule()
scheduler.start_task.assert_called_with(pending_project_update)
assert scheduler.create_project_update.call_count == 0
class TestCreateDependentProjectUpdate():
def test(self, scheduler_factory, pending_job, waiting_project_update):
scheduler = scheduler_factory(tasks=[pending_job],
create_project_update=waiting_project_update)
scheduler._schedule()
scheduler.start_task.assert_called_with(waiting_project_update, [pending_job])
def test_cache_hit(self, scheduler_factory, pending_job, successful_project_update):
scheduler = scheduler_factory(tasks=[successful_project_update, pending_job],
latest_project_updates=[successful_project_update])
scheduler._schedule()
scheduler.start_task.assert_called_with(pending_job)
def test_cache_miss(self, scheduler_factory, pending_job, successful_project_update_cache_expired, waiting_project_update):
scheduler = scheduler_factory(tasks=[successful_project_update_cache_expired, pending_job],
latest_project_updates=[successful_project_update_cache_expired],
create_project_update=waiting_project_update)
scheduler._schedule()
scheduler.start_task.assert_called_with(waiting_project_update, [pending_job])
def test_last_update_failed(self, scheduler_factory, pending_job, failed_project_update, waiting_project_update):
scheduler = scheduler_factory(tasks=[failed_project_update, pending_job],
latest_project_updates=[failed_project_update],
create_project_update=waiting_project_update)
scheduler._schedule()
scheduler.start_task.assert_called_with(waiting_project_update, [pending_job])
class TestProjectUpdateBlocked():
def test_projct_update_running(self, scheduler_factory, running_project_update, pending_project_update):
scheduler = scheduler_factory(tasks=[running_project_update, pending_project_update])
scheduler._schedule()
scheduler.start_task.assert_not_called()
assert scheduler.create_project_update.call_count == 0
def test_job_running(self, scheduler_factory, running_job, pending_project_update):
scheduler = scheduler_factory(tasks=[running_job, pending_project_update])
scheduler._schedule()
scheduler.start_task.assert_not_called()

View File

@ -400,7 +400,7 @@ os.environ.setdefault('DJANGO_LIVE_TEST_SERVER_ADDRESS', 'localhost:9013-9199')
djcelery.setup_loader()
BROKER_URL = 'amqp://guest:guest@localhost:5672//'
CELERY_DEFAULT_QUEUE = 'default'
CELERY_DEFAULT_QUEUE = 'tower'
CELERY_TASK_SERIALIZER = 'json'
CELERY_RESULT_SERIALIZER = 'json'
CELERY_ACCEPT_CONTENT = ['json']
@ -414,25 +414,18 @@ CELERY_RESULT_BACKEND = 'djcelery.backends.database:DatabaseBackend'
CELERY_IMPORTS = ('awx.main.scheduler.tasks',)
CELERY_QUEUES = (
Queue('default', Exchange('default'), routing_key='default'),
Queue('jobs', Exchange('jobs'), routing_key='jobs'),
Queue('scheduler', Exchange('scheduler', type='topic'), routing_key='scheduler.job.#', durable=False),
Broadcast('broadcast_all')
# Projects use a fanout queue, this isn't super well supported
Queue('tower', Exchange('tower'), routing_key='tower'),
Queue('tower_scheduler', Exchange('scheduler', type='topic'), routing_key='tower_scheduler.job.#', durable=False),
Broadcast('tower_broadcast_all')
)
CELERY_ROUTES = {'awx.main.tasks.run_job': {'queue': 'jobs',
'routing_key': 'jobs'},
'awx.main.tasks.run_project_update': {'queue': 'jobs',
'routing_key': 'jobs'},
'awx.main.tasks.run_inventory_update': {'queue': 'jobs',
'routing_key': 'jobs'},
'awx.main.tasks.run_ad_hoc_command': {'queue': 'jobs',
'routing_key': 'jobs'},
'awx.main.tasks.run_system_job': {'queue': 'jobs',
'routing_key': 'jobs'},
'awx.main.scheduler.tasks.run_job_launch': {'queue': 'scheduler',
'routing_key': 'scheduler.job.launch'},
'awx.main.scheduler.tasks.run_job_complete': {'queue': 'scheduler',
'routing_key': 'scheduler.job.complete'},
CELERY_ROUTES = {'awx.main.scheduler.tasks.run_fail_inconsistent_running_jobs': {'queue': 'tower',
'routing_key': 'tower'},
'awx.main.scheduler.tasks.run_task_manager': {'queue': 'tower',
'routing_key': 'tower'},
'awx.main.scheduler.tasks.run_job_launch': {'queue': 'tower_scheduler',
'routing_key': 'tower_scheduler.job.launch'},
'awx.main.scheduler.tasks.run_job_complete': {'queue': 'tower_scheduler',
'routing_key': 'tower_scheduler.job.complete'},
'awx.main.tasks.cluster_node_heartbeat': {'queue': 'default',
'routing_key': 'cluster.heartbeat'},
'awx.main.tasks.purge_old_stdout_files': {'queue': 'default',

View File

@ -29,3 +29,8 @@
values from within your custom inventory script (by - for example - reading
an environment variable or a file's contents)
[[#5879](https://github.com/ansible/ansible-tower/issues/5879)]
* Added support for configuring groups of instance nodes to run tower
jobs [[#5898](https://github.com/ansible/ansible-tower/issues/5898)]
* Fixed an issue installing Tower on multiple nodes where cluster
internal node references are used
[[#6231](https://github.com/ansible/ansible-tower/pull/6231)]

View File

@ -1,42 +1,66 @@
## Tower Clustering/HA Overview
Prior to 3.1 the Ansible Tower HA solution was not a true high-availability system. In 3.1 we have rewritten this system entirely with a new focus in mind:
Prior to 3.1 the Ansible Tower HA solution was not a true high-availability system. In 3.1 we have rewritten this system entirely with a new focus towards
a proper highly available clustered system. In 3.2 we have extended this further to allow grouping of clustered instances into different pools/queues.
* Each node should be able to act as an entrypoint for UI and API Access.
This should enable Tower administrators to use load balancers in front of as many nodes as they wish
* Each instance should be able to act as an entrypoint for UI and API Access.
This should enable Tower administrators to use load balancers in front of as many instances as they wish
and maintain good data visibility.
* Each node should be able to join the Tower cluster and expand its ability to execute jobs. This is currently
a naive system where jobs can and will run anywhere rather than be directed on where to run. *That* work will
be done later when building out the Federation/Rampart system.
* Provisioning new nodes should be as simple as updating the `inventory` file and re-running the setup playbook
* Nodes can be deprovisioned with a simple management commands
* Each instance should be able to join the Tower cluster and expand its ability to execute jobs.
* Provisioning new instance should be as simple as updating the `inventory` file and re-running the setup playbook
* Instances can be deprovisioned with a simple management commands
* Instances can be grouped into one or more Instance Groups to share resources for topical purposes.
* These instance groups should be assignable to certain resources:
* Organizations
* Inventories
* Job Templates
such that execution of jobs under those resources will favor particular queues
It's important to point out a few existing things:
* PostgreSQL is still a standalone instance node and is not clustered. We also won't manage replica configuration or,
* PostgreSQL is still a standalone instance and is not clustered. We also won't manage replica configuration or,
if the user configures standby replicas, database failover.
* All nodes should be reachable from all other nodes and they should be able to reach the database. It's also important
* All instances should be reachable from all other instances and they should be able to reach the database. It's also important
for the hosts to have a stable address and/or hostname (depending on how you configure the Tower host)
* RabbitMQ is the cornerstone of Tower's Clustering system. A lot of our configuration requirements and behavior is dictated
by its needs. Thus we are pretty inflexible to customization beyond what our setup playbook allows. Each Tower node has a
deployment of RabbitMQ that will cluster with the other nodes' RabbitMQ instances.
* Existing old-style HA deployments will be transitioned automatically to the new HA system during the upgrade process.
* Manual projects will need to be synced to all nodes by the customer
by its needs. Thus we are pretty inflexible to customization beyond what our setup playbook allows. Each Tower instance has a
deployment of RabbitMQ that will cluster with the other instances' RabbitMQ instances.
* Existing old-style HA deployments will be transitioned automatically to the new HA system during the upgrade process to 3.1.
* Manual projects will need to be synced to all instances by the customer
## Important Changes
* There is no concept of primary/secondary in the new Tower system. *All* systems are primary.
* Setup playbook changes to configure rabbitmq and give hints to the type of network the hosts are on.
* The `inventory` file for Tower deployments should be saved/persisted. If new nodes are to be provisioned
* The `inventory` file for Tower deployments should be saved/persisted. If new instances are to be provisioned
the passwords and configuration options as well as host names will need to be available to the installer.
## Concepts and Configuration
### Installation and the Inventory File
The current standalone node configuration doesn't change for a 3.1 deploy. The inventory file does change in some important ways:
The current standalone instance configuration doesn't change for a 3.1+ deploy. The inventory file does change in some important ways:
* Since there is no primary/secondary configuration those inventory groups go away and are replaced with a
single inventory group `tower`. The `database` group remains for specifying an external postgres, however:
single inventory group `tower`. The customer may, *optionally*, define other groups and group instances in those groups. These groups
should be prefixed with `instance_group_`. Instances are not required to be in the `tower` group alongside other `instance_group_` groups, but one
instance *must* be present in the `tower` group. Technically `tower` is a group like any other `instance_group_` group but it must always be present
and if a specific group is not associated with a specific resource then job execution will always fall back to the `tower` group:
```
[tower]
hostA
hostB
hostC
[instance_group_east]
hostB
hostC
[instance_group_west]
hostC
hostD
```
The `database` group remains for specifying an external postgres. If the database host is provisioned seperately this group should be empty
```
[tower]
hostA
@ -46,23 +70,34 @@ The current standalone node configuration doesn't change for a 3.1 deploy. The i
[database]
hostDB
```
* It's common for customers to provision Tower instances externally but prefer to reference them by internal addressing. This is most significant
for RabbitMQ clustering where the service isn't available at all on an external interface. For this purpose it is necessary to assign the internal
address for RabbitMQ links as such:
```
[tower]
hostA rabbitmq_host=10.1.0.2
hostB rabbitmq_host=10.1.0.3
hostC rabbitmq_host=10.1.0.3
```
* The `redis_password` field is removed from `[all:vars]`
* There are various new fields for RabbitMQ:
- `rabbitmq_port=5672` - RabbitMQ is installed on each node and is not optional, it's also not possible to externalize. It is
- `rabbitmq_port=5672` - RabbitMQ is installed on each instance and is not optional, it's also not possible to externalize. It is
possible to configure what port it listens on and this setting controls that.
- `rabbitmq_vhost=tower` - Tower configures a rabbitmq virtualhost to isolate itself. This controls that settings.
- `rabbitmq_username=tower` and `rabbitmq_password=tower` - Each node will be configured with these values and each node's Tower
- `rabbitmq_username=tower` and `rabbitmq_password=tower` - Each instance will be configured with these values and each instance's Tower
instance will be configured with it also. This is similar to our other uses of usernames/passwords.
- `rabbitmq_cookie=<somevalue>` - This value is unused in a standalone deployment but is critical for clustered deployments.
This acts as the secret key that allows RabbitMQ cluster members to identify each other.
- `rabbitmq_use_long_names` - RabbitMQ is pretty sensitive to what each node is named. We are flexible enough to allow FQDNs
- `rabbitmq_use_long_names` - RabbitMQ is pretty sensitive to what each instance is named. We are flexible enough to allow FQDNs
(host01.example.com), short names (host01), or ip addresses (192.168.5.73). Depending on what is used to identify each host
in the `inventory` file this value may need to be changed. For FQDNs and ip addresses this value needs to be `true`. For short
names it should be `false`
- `rabbitmq_enable_manager` - Setting this to `true` will expose the RabbitMQ management web console on each node.
- `rabbitmq_enable_manager` - Setting this to `true` will expose the RabbitMQ management web console on each instance.
The most important field to point out for variability is `rabbitmq_use_long_name`. That's something we can't detect or provide a reasonable
default for so it's important to point out when it needs to be changed.
default for so it's important to point out when it needs to be changed. If instances are provisioned to where they reference other instances
internally and not on external addressess then `rabbitmq_use_long_name` semantics should follow the internal addressing (aka `rabbitmq_host`.
Other than `rabbitmq_use_long_name` the defaults are pretty reasonable:
```
@ -77,42 +112,54 @@ rabbitmq_use_long_name=false
rabbitmq_enable_manager=false
```
### Provisioning and Deprovisioning Nodes
### Provisioning and Deprovisioning Instances and Groups
* Provisioning
Provisioning Nodes after installation is supported by updating the `inventory` file and re-running the setup playbook. It's important that this file
contain all passwords and information used when installing the cluster or other nodes may be reconfigured (This could be intentional)
Provisioning Instances after installation is supported by updating the `inventory` file and re-running the setup playbook. It's important that this file
contain all passwords and information used when installing the cluster or other instances may be reconfigured (This could be intentional)
* Deprovisioning
Tower does not automatically de-provision nodes since we can't distinguish between a node that was taken offline intentionally or due to failure.
Instead the procedure for deprovisioning a node is to shut it down (or stop the `ansible-tower-service`) and run the Tower deprovision command:
Tower does not automatically de-provision instances since we can't distinguish between an instance that was taken offline intentionally or due to failure.
Instead the procedure for deprovisioning an instance is to shut it down (or stop the `ansible-tower-service`) and run the Tower deprovision command:
```
$ tower-manage deprovision-node <nodename>
```
* Removing/Deprovisioning Instance Groups
Tower does not automatically de-provision or remove instance groups, even though re-provisioning will often cause these to be unused. They may still
show up in api endpoints and stats monitoring. These groups can be removed with the following command:
```
$ tower-manage unregister_queue --queuename=<name>
```
### Status and Monitoring
Tower itself reports as much status as it can via the api at `/api/v1/ping` in order to provide validation of the health
Tower itself reports as much status as it can via the api at `/api/v2/ping` in order to provide validation of the health
of the Cluster. This includes:
* The node servicing the HTTP request
* The last heartbeat time of all other nodes in the cluster
* The state of the Job Queue, any jobs each node is running
* The instance servicing the HTTP request
* The last heartbeat time of all other instances in the cluster
* The state of the Job Queue
* The RabbitMQ cluster status
* Instance Groups and Instance membership in those groups
### Node Services and Failure Behavior
A more detailed view of Instances and Instance Groups, including running jobs and membership
information can be seen at `/api/v2/instances/` and `/api/v2/instance_groups`.
Each Tower node is made up of several different services working collaboratively:
### Instance Services and Failure Behavior
Each Tower instance is made up of several different services working collaboratively:
* HTTP Services - This includes the Tower application itself as well as external web services.
* Callback Receiver - Whose job it is to receive job events from running Ansible jobs.
* Celery - The worker queue, that processes and runs all jobs.
* RabbitMQ - Message Broker, this is used as a signaling mechanism for Celery as well as any event data propogated to the application.
* Memcached - local caching service for the node it lives on.
* Memcached - local caching service for the instance it lives on.
Tower is configured in such a way that if any of these services or their components fail then all services are restarted. If these fail sufficiently
often in a short span of time then the entire node will be placed offline in an automated fashion in order to allow remediation without causing unexpected
often in a short span of time then the entire instance will be placed offline in an automated fashion in order to allow remediation without causing unexpected
behavior.
### Job Runtime Behavior
@ -120,56 +167,72 @@ behavior.
Ideally a regular user of Tower should not notice any semantic difference to the way jobs are run and reported. Behind the scenes its worth
pointing out the differences in how the system behaves.
When a job is submitted from the API interface it gets pushed into the Celery queue on RabbitMQ. A single RabbitMQ node is the responsible master for
individual queues but each Tower node will connect to and receive jobs from that queue using a Fair scheduling algorithm. Any node in the cluster is just
as likely to receive the work and execute the task. If a node fails while executing jobs then the work is marked as permanently failed.
When a job is submitted from the API interface it gets pushed into the Celery queue on RabbitMQ. A single RabbitMQ instance is the responsible master for
individual queues but each Tower instance will connect to and receive jobs from that queue using a Fair scheduling algorithm. Any instance on the cluster is
just as likely to receive the work and execute the task. If a instance fails while executing jobs then the work is marked as permanently failed.
As Tower nodes are brought online it effectively expands the work capacity of the Tower system which is measured as one entire unit (the cluster's capacity).
Conversely de-provisioning a node will remove capacity from the cluster.
If a cluster is divided into separate Instance Groups then the behavior is similar to the cluster as a whole. If two instances are assigned to a group then
either one is just as likely to receive a job as any other in the same group.
It's important to note that not all nodes are required to be provisioned with an equal capacity.
As Tower instances are brought online it effectively expands the work capacity of the Tower system. If those instances are also placed into Instance Groups then
they also expand that group's capacity. If an instance is performing work and it is a member of multiple groups then capacity will be reduced from all groups for
which it is a member. De-provisioning an instance will remove capacity from the cluster wherever that instance was assigned.
Project updates behave differently than they did before. Previously they were ordinary jobs that ran on a single node. It's now important that
they run successfully on any node that could potentially run a job. Project's will now sync themselves to the correct version on the node immediately
It's important to note that not all instances are required to be provisioned with an equal capacity.
Project updates behave differently than they did before. Previously they were ordinary jobs that ran on a single instance. It's now important that
they run successfully on any instance that could potentially run a job. Project's will now sync themselves to the correct version on the instance immediately
prior to running the job.
If an Instance Group is configured but all instances in that group are offline or unavailable, any jobs that are launched targeting only that group will be stuck
in a waiting state until instances become available. Fallback or backup resources should be provisioned to handle any work that might encounter this scenario.
## Acceptance Criteria
When verifying acceptance we should ensure the following statements are true
* Tower should install as a standalone Node
* Tower should install as a standalone Instance
* Tower should install in a Clustered fashion
* Instance should, optionally, be able to be grouped arbitrarily into different Instance Groups
* Capacity should be tracked at the group level and capacity impact should make sense relative to what instance a job is
running on and what groups that instance is a member of.
* Provisioning should be supported via the setup playbook
* De-provisioning should be supported via a management command
* All jobs, inventory updates, and project updates should run successfully
* Jobs should be able to run on all hosts
* Jobs should be able to run on hosts which it is targeted. If assigned implicitly or directly to groups then it should
only run on instances in those Instance Groups.
* Project updates should manifest their data on the host that will run the job immediately prior to the job running
* Tower should be able to reasonably survive the removal of all nodes in the cluster
* Tower should be able to reasonably survive the removal of all instances in the cluster
* Tower should behave in a predictable fashiong during network partitioning
## Testing Considerations
* Basic testing should be able to demonstrate parity with a standalone node for all integration testing.
* Basic testing should be able to demonstrate parity with a standalone instance for all integration testing.
* Basic playbook testing to verify routing differences, including:
- Basic FQDN
- Short-name name resolution
- ip addresses
- /etc/hosts static routing information
* We should test behavior of large and small clusters. I would envision small clusters as 2 - 3 nodes and large
clusters as 10 - 15 nodes
* Failure testing should involve killing single nodes and killing multiple nodes while the cluster is performing work.
* We should test behavior of large and small clusters. I would envision small clusters as 2 - 3 instances and large
clusters as 10 - 15 instances
* Failure testing should involve killing single instances and killing multiple instances while the cluster is performing work.
Job failures during the time period should be predictable and not catastrophic.
* Node downtime testing should also include recoverability testing. Killing single services and ensuring the system can
* Instance downtime testing should also include recoverability testing. Killing single services and ensuring the system can
return itself to a working state
* Persistent failure should be tested by killing single services in such a way that the cluster node cannot be recovered
and ensuring that the node is properly taken offline
* Persistent failure should be tested by killing single services in such a way that the cluster instance cannot be recovered
and ensuring that the instance is properly taken offline
* Network partitioning failures will be important also. In order to test this
- Disallow a single node from communicating with the other nodes but allow it to communicate with the database
- Break the link between nodes such that it forms 2 or more groups where groupA and groupB can't communicate but all nodes
- Disallow a single instance from communicating with the other instances but allow it to communicate with the database
- Break the link between instances such that it forms 2 or more groups where groupA and groupB can't communicate but all instances
can communicate with the database.
* Crucially when network partitioning is resolved all nodes should recover into a consistent state
* Crucially when network partitioning is resolved all instances should recover into a consistent state
* Upgrade Testing, verify behavior before and after are the same for the end user.
* Project Updates should be thoroughly tested for all scm types (git, svn, hg) and for manual projects.
* Setting up instance groups in two scenarios:
a) instances are shared between groups
b) instances are isolated to particular groups
Organizations, Inventories, and Job Templates should be variously assigned to one or many groups and jobs should execute
in those groups in preferential order as resources are available.
## Performance Testing

View File

@ -14,19 +14,10 @@ The `schedule()` function is ran (a) periodically by a celery task and (b) on jo
### Scheduler Algorithm
* Get all non-completed jobs, `all_tasks`
* Generate the hash tables from `all_tasks`:
* `<job_template_id, True/False>` indicates a job is running
* `<project_id, True/False>` indicates a project update is running
* `<inventory_id, True/False>` indicates a job template or inventory update is running
* `<inventory_source_id, True/False>` indiciates an inventory update is running
* `<workflow_job_template_id, True/False>` indiciates a workflow job is running
* `<project_id, latest_project_update_partial>` used to determine cache timeout
* `<inventory_id, [ inventory_source_partial, ... ]>` used to determine cache timeout and dependencies to spawn
* `<inventory_source_id, latest_inventory_update_partial>` used to determine cache timeout
* Detect finished workflow jobs
* Spawn next workflow jobs if needed
* For each pending jobs; start with oldest created job and stop when no capacity == 0
* If job is not blocked, determined using generated hash tables, and there is capacity, then mark the as `waiting` and submit the job to celery.
* For each pending jobs; start with oldest created job
* If job is not blocked, and there is capacity in the instance group queue, then mark the as `waiting` and submit the job to celery.
### Job Lifecycle
| Job Status | State |
@ -41,11 +32,8 @@ The `schedule()` function is ran (a) periodically by a celery task and (b) on jo
## Code Composition
The main goal of the new task manager is to run in our HA environment. This translates to making the task manager logic run on any tower node. To support this we need to remove any reliance on state between task manager schedule logic runs. We had a secondary goal in mind of designing the task manager to have limited/no access to the database for the future federation feature. This secondary requirement combined with performance needs led us to create partial models that wrap dict database model data.
### Partials
Partials wrap a subset of Django model dict data, provide a simple static query method that is purpose built to support the populating of the task manager hash tables, have a link back to the model which they are wrapping so that the original Django ORM model for which the partial is wrapping can be easily gotten, and can be made serializable via `<type, self.data>` since `self.data` is a `dict` of the database record.
### Blocking Logic
The blocking logic has been moved from the respective ORM model to the code that manages the dependency hash tables. The blocking logic could easily be moved to the partials or even the ORM models. However, the per-model blocking logic code would be operating on the dependency hash tables; not on ORM models as in the previous design. The blocking logic is kept close to the data-structures required to operate on.
The blocking logic is handled by a mixture of ORM instance references and task manager local tracking data in the scheduler instance
## Acceptance Tests
@ -53,8 +41,8 @@ The new task manager should, basically, work like the old one. Old task manager
### Task Manager Rules
* Groups of blocked tasks run in chronological order
* Tasks that are not blocked run whenever there is capacity available***
* ***1 job is always allowed to run, even if there isn't enough capacity.
* Tasks that are not blocked run whenever there is capacity available in the instance group they are set to run in***
* ***1 job is always allowed to run per instance group, even if there isn't enough capacity.
* Only 1 Project Updates for a Project may be running
* Only 1 Inventory Update for an Inventory Source may be running
* For a related Project, only a Job xor Project Update may be running

View File

@ -35,7 +35,7 @@ services:
dockerfile: Dockerfile-logstash
# Postgres Database Container
postgres:
image: postgres:9.4.1
image: postgres:9.6
memcached:
image: memcached:alpine
ports:

View File

@ -4,7 +4,7 @@ minfds = 4096
nodaemon=true
[program:celeryd]
command = python manage.py celeryd -l DEBUG -B --autoreload --autoscale=20,3 --schedule=/celerybeat-schedule -Q projects,jobs,default,scheduler,broadcast_all,%(ENV_HOSTNAME)s -n celery@%(ENV_HOSTNAME)s
command = python manage.py celeryd -l DEBUG -B --autoreload --autoscale=20,3 --schedule=/celerybeat-schedule -Q tower_scheduler,tower_broadcast_all,tower,%(ENV_HOSTNAME)s -n celery@%(ENV_HOSTNAME)s
autostart = true
autorestart = true
redirect_stderr=true