1
0
mirror of https://github.com/ansible/awx.git synced 2024-11-02 09:51:09 +03:00

Workflow RBAC and prompting basic changes

This commit is contained in:
AlanCoding 2016-09-20 17:16:26 -04:00
parent b7a6aa01a3
commit 2ffa7a91ec
7 changed files with 409 additions and 118 deletions

View File

@ -110,6 +110,20 @@ def reverse_gfk(content_object):
camelcase_to_underscore(content_object.__class__.__name__): content_object.get_absolute_url()
}
def vars_validate_or_raise(vars_str):
# vars must be blank, a valid JSON or YAML dict, or ...
try:
json.loads((vars_str or '').strip() or '{}')
return vars_str
except ValueError:
pass
try:
yaml.safe_load(vars_str)
return vars_str
except yaml.YAMLError:
pass
raise serializers.ValidationError('Must be valid JSON or YAML.')
class BaseSerializerMetaclass(serializers.SerializerMetaclass):
'''
@ -996,14 +1010,7 @@ class ProjectUpdateCancelSerializer(ProjectUpdateSerializer):
class BaseSerializerWithVariables(BaseSerializer):
def validate_variables(self, value):
try:
json.loads(value.strip() or '{}')
except ValueError:
try:
yaml.safe_load(value)
except yaml.YAMLError:
raise serializers.ValidationError('Must be valid JSON or YAML.')
return value
return vars_validate_or_raise(value)
class InventorySerializer(BaseSerializerWithVariables):
@ -1326,18 +1333,7 @@ class InventorySourceOptionsSerializer(BaseSerializer):
return res
def validate_source_vars(self, value):
# source_env must be blank, a valid JSON or YAML dict, or ...
try:
json.loads((value or '').strip() or '{}')
return value
except ValueError:
pass
try:
yaml.safe_load(value)
return value
except yaml.YAMLError:
pass
raise serializers.ValidationError('Must be valid JSON or YAML.')
return vars_validate_or_raise(value)
def validate(self, attrs):
# TODO: Validate source, validate source_regions
@ -1900,18 +1896,7 @@ class JobTemplateSerializer(UnifiedJobTemplateSerializer, JobOptionsSerializer):
return super(JobTemplateSerializer, self).validate(attrs)
def validate_extra_vars(self, value):
# extra_vars must be blank, a valid JSON or YAML dict, or ...
try:
json.loads((value or '').strip() or '{}')
return value
except ValueError:
pass
try:
yaml.safe_load(value)
return value
except yaml.YAMLError:
pass
raise serializers.ValidationError('Must be valid JSON or YAML.')
return vars_validate_or_raise(value)
class JobSerializer(UnifiedJobSerializer, JobOptionsSerializer):
@ -2178,7 +2163,7 @@ class SystemJobCancelSerializer(SystemJobSerializer):
class WorkflowJobTemplateSerializer(UnifiedJobTemplateSerializer):
class Meta:
model = WorkflowJobTemplate
fields = ('*',)
fields = ('*', 'extra_vars', 'organization')
def get_related(self, obj):
res = super(WorkflowJobTemplateSerializer, self).get_related(obj)
@ -2195,6 +2180,9 @@ class WorkflowJobTemplateSerializer(UnifiedJobTemplateSerializer):
))
return res
def validate_extra_vars(self, value):
return vars_validate_or_raise(value)
# TODO:
class WorkflowJobTemplateListSerializer(WorkflowJobTemplateSerializer):
pass
@ -2226,10 +2214,15 @@ class WorkflowJobListSerializer(WorkflowJobSerializer, UnifiedJobListSerializer)
pass
class WorkflowNodeBaseSerializer(BaseSerializer):
job_type = serializers.SerializerMethodField()
job_tags = serializers.SerializerMethodField()
limit = serializers.SerializerMethodField()
skip_tags = serializers.SerializerMethodField()
class Meta:
# TODO: workflow_job and job read-only
fields = ('id', 'url', 'related', 'unified_job_template', 'success_nodes', 'failure_nodes', 'always_nodes',)
fields = ('id', 'url', 'related', 'unified_job_template',
'inventory', 'credential', 'job_type', 'job_tags', 'skip_tags', 'limit', 'skip_tags')
read_only_fields = ('success_nodes', 'failure_nodes', 'always_nodes')
def get_related(self, obj):
res = super(WorkflowNodeBaseSerializer, self).get_related(obj)
@ -2237,6 +2230,19 @@ class WorkflowNodeBaseSerializer(BaseSerializer):
res['unified_job_template'] = obj.unified_job_template.get_absolute_url()
return res
def get_job_type(self, obj):
return obj.char_prompts.get('job_type', None)
def get_job_tags(self, obj):
return obj.char_prompts.get('job_tags', None)
def get_skip_tags(self, obj):
return obj.char_prompts.get('skip_tags', None)
def get_limit(self, obj):
return obj.char_prompts.get('limit', None)
class WorkflowJobTemplateNodeSerializer(WorkflowNodeBaseSerializer):
class Meta:
model = WorkflowJobTemplateNode
@ -2251,9 +2257,36 @@ class WorkflowJobTemplateNodeSerializer(WorkflowNodeBaseSerializer):
res['workflow_job_template'] = reverse('api:workflow_job_template_detail', args=(obj.workflow_job_template.pk,))
return res
def to_internal_value(self, data):
internal_value = super(WorkflowNodeBaseSerializer, self).to_internal_value(data)
char_prompts = self.extract_char_prompts(data)
internal_value['char_prompts'] = char_prompts
return internal_value
def extract_char_prompts(self, data):
char_prompts = {}
for fd in ['job_type', 'job_tags', 'skip_tags', 'limit', 'skip_tags']:
if data.get(fd, None):
char_prompts[fd] = data[fd]
return char_prompts
def validate(self, attrs):
if 'char_prompts' in attrs:
if 'job_type' in attrs['char_prompts']:
job_types = [t for t, v in JOB_TYPE_CHOICES]
if attrs['char_prompts']['job_type'] not in job_types:
raise serializers.ValidationError({
"job_type": "%s is not a valid job type. The choices are %s." % (
attrs['char_prompts']['job_type'], job_types)})
ujt_obj = attrs.get('unified_job_template', None)
if isinstance(ujt_obj, WorkflowJobTemplate):
raise serializers.ValidationError({"unified_job_template": "Can not nest Workflow Job Templates inside of Workflow Job Templates"})
return super(WorkflowJobTemplateNodeSerializer, self).validate(attrs)
class WorkflowJobNodeSerializer(WorkflowNodeBaseSerializer):
class Meta:
model = WorkflowJobTemplateNode
# TODO: workflow_job and job read-only
model = WorkflowJobNode
fields = ('*', 'job', 'workflow_job',)
def get_related(self, obj):

View File

@ -2613,34 +2613,45 @@ class JobTemplateObjectRolesList(SubListAPIView):
content_type = ContentType.objects.get_for_model(self.parent_model)
return Role.objects.filter(content_type=content_type, object_id=po.pk)
# TODO:
class WorkflowJobNodeList(ListCreateAPIView):
class WorkflowJobNodeList(ListAPIView):
model = WorkflowJobNode
serializer_class = WorkflowJobNodeListSerializer
new_in_310 = True
# TODO:
class WorkflowJobNodeDetail(RetrieveUpdateDestroyAPIView):
class WorkflowJobNodeDetail(RetrieveAPIView):
model = WorkflowJobNode
serializer_class = WorkflowJobNodeDetailSerializer
new_in_310 = True
# TODO:
class WorkflowJobTemplateNodeList(ListCreateAPIView):
model = WorkflowJobTemplateNode
serializer_class = WorkflowJobTemplateNodeListSerializer
new_in_310 = True
# TODO:
def update_raw_data(self, data):
for fd in ['job_type', 'job_tags', 'skip_tags', 'limit', 'skip_tags']:
data[fd] = None
return super(WorkflowJobTemplateNodeList, self).update_raw_data(data)
class WorkflowJobTemplateNodeDetail(RetrieveUpdateDestroyAPIView):
model = WorkflowJobTemplateNode
serializer_class = WorkflowJobTemplateNodeDetailSerializer
new_in_310 = True
def update_raw_data(self, data):
for fd in ['job_type', 'job_tags', 'skip_tags', 'limit', 'skip_tags']:
data[fd] = None
try:
obj = self.get_object()
data.update(obj.char_prompts)
except:
pass
return super(WorkflowJobTemplateNodeDetail, self).update_raw_data(data)
class WorkflowJobTemplateNodeChildrenBaseList(EnforceParentRelationshipMixin, SubListCreateAttachDetachAPIView):
@ -2732,7 +2743,10 @@ class WorkflowJobTemplateLaunch(GenericAPIView):
serializer_class = EmptySerializer
def get(self, request, *args, **kwargs):
return Response({})
data = {}
obj = self.get_object()
data['warnings'] = obj.get_warnings()
return Response(data)
def post(self, request, *args, **kwargs):
obj = self.get_object()
@ -2749,7 +2763,6 @@ class WorkflowJobTemplateWorkflowNodesList(SubListCreateAPIView):
model = WorkflowJobTemplateNode
serializer_class = WorkflowJobTemplateNodeListSerializer
always_allow_superuser = True # TODO: RBAC
parent_model = WorkflowJobTemplate
relationship = 'workflow_job_template_nodes'
parent_key = 'workflow_job_template'
@ -2763,17 +2776,11 @@ class WorkflowJobTemplateJobsList(SubListAPIView):
relationship = 'jobs'
parent_key = 'workflow_job_template'
# TODO:
class WorkflowJobList(ListCreateAPIView):
model = WorkflowJob
serializer_class = WorkflowJobListSerializer
def get(self, request, *args, **kwargs):
if not request.user.is_superuser and not request.user.is_system_auditor:
raise PermissionDenied("Superuser privileges needed.")
return super(WorkflowJobList, self).get(request, *args, **kwargs)
# TODO:
class WorkflowJobDetail(RetrieveDestroyAPIView):

View File

@ -1141,7 +1141,12 @@ class WorkflowJobTemplateNodeAccess(BaseAccess):
def get_queryset(self):
if self.user.is_superuser or self.user.is_system_auditor:
return self.model.objects.all()
qs = self.model.objects.all()
else:
qs = self.model.objects.filter(
workflow_job_template__in=WorkflowJobTemplate.accessible_objects(
self.user, 'read_role'))
return qs
@check_superuser
def can_read(self, obj):
@ -1164,37 +1169,35 @@ class WorkflowJobTemplateNodeAccess(BaseAccess):
def can_delete(self, obj):
return self.can_change(obj, None)
# TODO:
class WorkflowJobNodeAccess(BaseAccess):
'''
I can see/use a WorkflowJobNode if I have permission to associated Workflow Job
I can see a WorkflowJobNode if I have permission to...
the workflow job template associated with...
the workflow job associated with the node.
Any deletion of editing of individual nodes would undermine the integrity
of the graph structure.
Deletion must happen as a cascade delete from the workflow job.
'''
model = WorkflowJobNode
def get_queryset(self):
if self.user.is_superuser or self.user.is_system_auditor:
return self.model.objects.all()
qs = self.model.objects.all()
else:
qs = self.model.objects.filter(
workflow_job__workflow_job_template__in=WorkflowJobTemplate.accessible_objects(
self.user, 'read_role'))
return qs
@check_superuser
def can_read(self, obj):
return True
@check_superuser
def can_add(self, data):
if not data: # So the browseable API will work
return True
return True
return False
@check_superuser
def can_change(self, obj, data):
if self.can_add(data) is False:
return False
return True
return False
def can_delete(self, obj):
return self.can_change(obj, None)
return False
# TODO:
class WorkflowJobTemplateAccess(BaseAccess):
@ -1209,7 +1212,8 @@ class WorkflowJobTemplateAccess(BaseAccess):
qs = self.model.objects.all()
else:
qs = self.model.accessible_objects(self.user, 'read_role')
return qs.select_related('created_by', 'modified_by', 'next_schedule').all()
return qs.select_related('created_by', 'modified_by', 'next_schedule',
'admin_role', 'execute_role', 'read_role').all()
@check_superuser
def can_read(self, obj):
@ -1224,61 +1228,79 @@ class WorkflowJobTemplateAccess(BaseAccess):
Users who are able to create deploy jobs can also run normal and check (dry run) jobs.
'''
if not data: # So the browseable API will work
return True
return Organization.accessible_objects(self.user, 'admin_role').exists()
# if reference_obj is provided, determine if it can be coppied
reference_obj = data.pop('reference_obj', None)
if 'survey_enabled' in data and data['survey_enabled']:
self.check_license(feature='surveys')
if self.user.is_superuser:
if reference_obj:
for node in reference_obj.workflow_job_template_nodes.all():
if node.inventory and self.user not in node.inventory.use_role:
return False
if node.credential and self.user not in node.credential.use_role:
return False
if node.unified_job_template:
if isinstance(node.unified_job_template, SystemJobTemplate):
if not self.user.is_superuser:
return False
elif isinstance(node.unified_job_template, JobTemplate):
if self.user not in node.unified_job_template.execute_role:
return False
elif isinstance(node.unified_job_template, Project):
if self.user not in node.unified_job_template.update_role:
return False
elif isinstance(node.unified_job_template, InventorySource):
if not self.user.can_access(InventorySource, 'start', node.unified_job_template):
return False
else:
return False
return True
def get_value(Class, field):
if reference_obj:
return getattr(reference_obj, field, None)
else:
pk = get_pk_from_dict(data, field)
if pk:
return get_object_or_400(Class, pk=pk)
else:
return None
# will check this if surveys are added to WFJT
# if 'survey_enabled' in data and data['survey_enabled']:
# self.check_license(feature='surveys')
return False
org_pk = get_pk_from_dict(data, 'organization')
if not org_pk:
# only superusers can create or manage orphan WFJTs
return self.user.is_superuser
org = get_object_or_400(Organization, pk=org_pk)
return self.user in org.admin_role
def can_start(self, obj, validate_license=True):
# TODO: Are workflows allowed for all licenses ??
# Check license.
'''
if validate_license:
# check basic license, node count
self.check_license()
if obj.job_type == PERM_INVENTORY_SCAN:
self.check_license(feature='system_tracking')
if obj.survey_enabled:
self.check_license(feature='surveys')
'''
# if surveys are added to WFJTs, check license here
# if obj.survey_enabled:
# self.check_license(feature='surveys')
# Super users can start any job
if self.user.is_superuser:
return True
return self.can_read(obj)
# TODO: We should use execute role rather than read role
#return self.user in obj.execute_role
return self.user in obj.execute_role
def can_change(self, obj, data):
data_for_change = data
if self.user not in obj.admin_role and not self.user.is_superuser:
return False
if data is not None:
data = dict(data)
# # Check survey license if surveys are added to WFJTs
# if 'survey_enabled' in data and obj.survey_enabled != data['survey_enabled'] and data['survey_enabled']:
# self.check_license(feature='surveys')
if 'survey_enabled' in data and obj.survey_enabled != data['survey_enabled'] and data['survey_enabled']:
self.check_license(feature='surveys')
if self.user.is_superuser:
return True
return self.can_read(obj) and self.can_add(data_for_change)
org_pk = get_pk_from_dict(data, 'organization')
if ('organization' not in data or
(org_pk is None and obj.organization is None) or
(obj.organization and obj.organization.pk == org_pk)):
# The simple case
return self.user in obj.admin_role
# If it already has an organization set, must be admin of the org to change it
if obj.organization and self.user not in obj.organization.admin_role:
return False
org = get_object_or_400(Organization, pk=org_pk)
return self.user in org.admin_role
def can_delete(self, obj):
is_delete_allowed = self.user.is_superuser or self.user in obj.admin_role
@ -1295,10 +1317,35 @@ class WorkflowJobTemplateAccess(BaseAccess):
class WorkflowJobAccess(BaseAccess):
'''
I can only see Workflow Jobs if I'm a super user
I can only see Workflow Jobs if I can see the associated
workflow job template that it was created from.
'''
model = WorkflowJob
def get_queryset(self):
if self.user.is_superuser or self.user.is_system_auditor:
qs = self.model.objects.all()
else:
qs = WorkflowJob.objects.filter(
workflow_job_template__in=WorkflowJobTemplate.accessible_objects(
self.user, 'read_role'))
return qs.select_related('created_by', 'modified_by')
def can_add(self, data):
# Old add-start system for launching jobs is being depreciated, and
# not supported for new types of resources
return False
def can_change(self, obj, data):
return False
def can_delete(self, obj):
if obj.workflow_job_template is None:
# only superusers can delete orphaned workflow jobs
return self.user.is_superuser
return self.user in obj.workflow_job_template.admin_role
class AdHocCommandAccess(BaseAccess):
'''
I can only see/run ad hoc commands when:

View File

@ -0,0 +1,82 @@
# -*- coding: utf-8 -*-
from __future__ import unicode_literals
from django.db import migrations, models
import jsonfield.fields
import django.db.models.deletion
import awx.main.fields
class Migration(migrations.Migration):
dependencies = [
('main', '0035_v310_jobevent_uuid'),
]
operations = [
migrations.AddField(
model_name='workflowjobnode',
name='char_prompts',
field=jsonfield.fields.JSONField(default={}, blank=True),
),
migrations.AddField(
model_name='workflowjobnode',
name='credential',
field=models.ForeignKey(related_name='workflowjobnodes', on_delete=django.db.models.deletion.SET_NULL, default=None, blank=True, to='main.Credential', null=True),
),
migrations.AddField(
model_name='workflowjobnode',
name='inventory',
field=models.ForeignKey(related_name='workflowjobnodes', on_delete=django.db.models.deletion.SET_NULL, default=None, blank=True, to='main.Inventory', null=True),
),
migrations.AddField(
model_name='workflowjobtemplate',
name='execute_role',
field=awx.main.fields.ImplicitRoleField(related_name='+', parent_role=[b'admin_role'], to='main.Role', null=b'True'),
),
migrations.AddField(
model_name='workflowjobtemplate',
name='organization',
field=models.ForeignKey(related_name='workflows', on_delete=django.db.models.deletion.SET_NULL, blank=True, to='main.Organization', null=True),
),
migrations.AddField(
model_name='workflowjobtemplate',
name='read_role',
field=awx.main.fields.ImplicitRoleField(related_name='+', parent_role=[b'singleton:system_auditor', b'organization.auditor_role', b'execute_role', b'admin_role'], to='main.Role', null=b'True'),
),
migrations.AddField(
model_name='workflowjobtemplatenode',
name='char_prompts',
field=jsonfield.fields.JSONField(default={}, blank=True),
),
migrations.AddField(
model_name='workflowjobtemplatenode',
name='credential',
field=models.ForeignKey(related_name='workflowjobtemplatenodes', on_delete=django.db.models.deletion.SET_NULL, default=None, blank=True, to='main.Credential', null=True),
),
migrations.AddField(
model_name='workflowjobtemplatenode',
name='inventory',
field=models.ForeignKey(related_name='workflowjobtemplatenodes', on_delete=django.db.models.deletion.SET_NULL, default=None, blank=True, to='main.Inventory', null=True),
),
migrations.AlterField(
model_name='workflowjobnode',
name='unified_job_template',
field=models.ForeignKey(related_name='workflowjobnodes', on_delete=django.db.models.deletion.SET_NULL, default=None, to='main.UnifiedJobTemplate', null=True),
),
migrations.AlterField(
model_name='workflowjobnode',
name='workflow_job',
field=models.ForeignKey(related_name='workflow_job_nodes', default=None, blank=True, to='main.WorkflowJob', null=True),
),
migrations.AlterField(
model_name='workflowjobtemplate',
name='admin_role',
field=awx.main.fields.ImplicitRoleField(related_name='+', parent_role=[b'singleton:system_administrator', b'organization.admin_role'], to='main.Role', null=b'True'),
),
migrations.AlterField(
model_name='workflowjobtemplatenode',
name='unified_job_template',
field=models.ForeignKey(related_name='workflowjobtemplatenodes', on_delete=django.db.models.deletion.SET_NULL, default=None, to='main.UnifiedJobTemplate', null=True),
),
]

View File

@ -51,7 +51,7 @@ role_descriptions = {
'adhoc_role' : 'May run ad hoc commands on an inventory',
'admin_role' : 'Can manage all aspects of the %s',
'auditor_role' : 'Can view all settings for the %s',
'execute_role' : 'May run the job template',
'execute_role' : 'May run the %s',
'member_role' : 'User is a member of the %s',
'read_role' : 'May view settings for the %s',
'update_role' : 'May update project or inventory or group using the configured source update system',

View File

@ -9,28 +9,28 @@ from django.db import models
from django.core.urlresolvers import reverse
#from django import settings as tower_settings
from jsonfield import JSONField
# AWX
from awx.main.models import UnifiedJobTemplate, UnifiedJob
from awx.main.models.notifications import JobNotificationMixin
from awx.main.models.base import BaseModel, CreatedModifiedModel, VarsDictProperty
from awx.main.models.rbac import (
ROLE_SINGLETON_SYSTEM_ADMINISTRATOR,
ROLE_SINGLETON_SYSTEM_AUDITOR
)
from awx.main.fields import ImplicitRoleField
from awx.main.models.mixins import ResourceMixin
__all__ = ['WorkflowJobTemplate', 'WorkflowJob', 'WorkflowJobOptions', 'WorkflowJobNode', 'WorkflowJobTemplateNode',]
CHAR_PROMPTS_LIST = ['job_type', 'job_tags', 'skip_tags', 'limit', 'skip_tags']
class WorkflowNodeBase(CreatedModifiedModel):
class Meta:
abstract = True
app_label = 'main'
# TODO: RBAC
'''
admin_role = ImplicitRoleField(
parent_role='workflow_job_template.admin_role',
)
'''
success_nodes = models.ManyToManyField(
'self',
blank=True,
@ -52,11 +52,68 @@ class WorkflowNodeBase(CreatedModifiedModel):
unified_job_template = models.ForeignKey(
'UnifiedJobTemplate',
related_name='%(class)ss',
blank=False,
null=True,
default=None,
on_delete=models.SET_NULL,
)
# Prompting-related fields
inventory = models.ForeignKey(
'Inventory',
related_name='%(class)ss',
blank=True,
null=True,
default=None,
on_delete=models.SET_NULL,
)
credential = models.ForeignKey(
'Credential',
related_name='%(class)ss',
blank=True,
null=True,
default=None,
on_delete=models.SET_NULL,
)
char_prompts = JSONField(
blank=True,
default={}
)
def prompts_dict(self):
data = {}
if self.inventory:
data['inventory'] = self.inventory
if self.credential:
data['credential'] = self.credential
for fd in CHAR_PROMPTS_LIST:
if fd in self.char_prompts:
data[fd] = self.char_prompts[fd]
return data
def get_prompts_warnings(self):
ujt_obj = self.unified_job_template
if ujt_obj is None:
return {}
prompts_dict = self.prompts_dict()
from awx.main.models import JobTemplate
if not isinstance(ujt_obj, JobTemplate):
return {'ignored': {'all': 'Can not use prompts on unified_job_template that is not type of job template'}}
ask_for_vars_dict = ujt_obj._ask_for_vars_dict()
ignored_dict = {}
missing_dict = {}
for fd in prompts_dict:
if not ask_for_vars_dict[fd]:
ignored_dict[fd] = 'Workflow node provided field, but job template is not set to ask on launch'
for fd in ask_for_vars_dict:
ujt_field = getattr(ujt_obj, fd)
if ujt_field is None and prompts_dict.get(fd, None) is None:
missing_dict[fd] = 'Job Template does not have this field and workflow node does not provide it'
data = {}
if ignored_dict:
data.update(ignored_dict)
if missing_dict:
data.update(missing_dict)
return data
class WorkflowJobTemplateNode(WorkflowNodeBase):
# TODO: Ensure the API forces workflow_job_template being set
@ -87,7 +144,7 @@ class WorkflowJobNode(WorkflowNodeBase):
blank=True,
null=True,
default=None,
on_delete=models.SET_NULL,
on_delete=models.CASCADE,
)
def get_absolute_url(self):
@ -102,14 +159,32 @@ class WorkflowJobOptions(BaseModel):
default='',
)
class WorkflowJobTemplate(UnifiedJobTemplate, WorkflowJobOptions):
class WorkflowJobTemplate(UnifiedJobTemplate, WorkflowJobOptions, ResourceMixin):
class Meta:
app_label = 'main'
admin_role = ImplicitRoleField(
parent_role='singleton:' + ROLE_SINGLETON_SYSTEM_ADMINISTRATOR,
# admin_role = ImplicitRoleField(
# parent_role='singleton:' + ROLE_SINGLETON_SYSTEM_ADMINISTRATOR,
# )
organization = models.ForeignKey(
'Organization',
blank=True,
null=True,
on_delete=models.SET_NULL,
related_name='workflows',
)
admin_role = ImplicitRoleField(parent_role=[
'singleton:' + ROLE_SINGLETON_SYSTEM_ADMINISTRATOR,
'organization.admin_role'
])
execute_role = ImplicitRoleField(parent_role=[
'admin_role'
])
read_role = ImplicitRoleField(parent_role=[
'singleton:' + ROLE_SINGLETON_SYSTEM_AUDITOR,
'organization.auditor_role', 'execute_role', 'admin_role'
])
@classmethod
def _get_unified_job_class(cls):
@ -146,6 +221,17 @@ class WorkflowJobTemplate(UnifiedJobTemplate, WorkflowJobOptions):
workflow_job.inherit_job_template_workflow_nodes()
return workflow_job
def get_warnings(self):
warning_data = {}
for node in self.workflow_job_template_nodes.all():
if node.unified_job_template is None:
warning_data[node.pk] = 'Node is missing a linked unified_job_template'
continue
node_prompts_warnings = node.get_prompts_warnings()
if node_prompts_warnings:
warning_data[node.pk] = node_prompts_warnings
return warning_data
class WorkflowJobInheritNodesMixin(object):
def _inherit_relationship(self, old_node, new_node, node_ids_map, node_type):
old_related_nodes = self._get_all_by_type(old_node, node_type)
@ -159,7 +245,27 @@ class WorkflowJobInheritNodesMixin(object):
Create a WorkflowJobNode for each WorkflowJobTemplateNode
'''
def _create_workflow_job_nodes(self, old_nodes):
return [WorkflowJobNode.objects.create(workflow_job=self, unified_job_template=old_node.unified_job_template) for old_node in old_nodes]
new_node_list = []
for old_node in old_nodes:
kwargs = dict(
workflow_job=self,
unified_job_template=old_node.unified_job_template,
)
ujt_obj = old_node.unified_job_template
if ujt_obj:
ask_for_vars_dict = ujt_obj._ask_for_vars_dict()
if ask_for_vars_dict['inventory'] and old_node.inventory:
kwargs['inventory'] = old_node.inventory
if ask_for_vars_dict['credential'] and old_node.credential:
kwargs['credential'] = old_node.credential
for fd in CHAR_PROMPTS_LIST:
new_char_prompts = {}
if ask_for_vars_dict[fd] and old_node.char_prompts.get(fd, None):
new_char_prompts[fd] = old_node.char_prompts[fd]
if new_char_prompts:
kwargs['char_prompts'] = new_char_prompts
new_node_list.append(WorkflowJobNode.objects.create(**kwargs))
return new_node_list
def _map_workflow_job_nodes(self, old_nodes, new_nodes):
node_ids_map = {}

View File

@ -8,6 +8,7 @@ from awx.main.access import (
BaseAccess,
check_superuser,
JobTemplateAccess,
WorkflowJobTemplateAccess,
)
from awx.main.models import Credential, Inventory, Project, Role, Organization
@ -110,3 +111,18 @@ def test_jt_can_add_bad_data(user_unit):
access = JobTemplateAccess(user_unit)
assert not access.can_add({'asdf': 'asdf'})
class TestWorkflowAccessMethods:
@pytest.fixture
def workflow(self, workflow_job_template_factory):
objects = workflow_job_template_factory('test_workflow', persisted=False)
return objects.workflow_job_template
class MockQuerySet(object):
pass
def test_workflow_can_add(self, workflow, user_unit):
# user_unit.admin_of_organizations = self.MockQuerySet()
access = WorkflowJobTemplateAccess(user_unit)
assert access.can_add({'organization': 1})