diff --git a/awx/api/permissions.py b/awx/api/permissions.py index 660c105200..ecd725bc6e 100644 --- a/awx/api/permissions.py +++ b/awx/api/permissions.py @@ -209,6 +209,8 @@ class ProjectUpdatePermission(ModelAccessPermission): class UserPermission(ModelAccessPermission): def check_post_permissions(self, request, view, obj=None): - if request.user.is_superuser: + if not request.data: + return request.user.admin_of_organizations.exists() + elif request.user.is_superuser: return True raise PermissionDenied() diff --git a/awx/api/serializers.py b/awx/api/serializers.py index 63533eb141..5b45c9de5b 100644 --- a/awx/api/serializers.py +++ b/awx/api/serializers.py @@ -337,7 +337,7 @@ class BaseSerializer(serializers.ModelSerializer): if hasattr(self, 'show_capabilities'): view = self.context.get('view', None) parent_obj = None - if hasattr(view, 'parent_model'): + if view and hasattr(view, 'parent_model'): parent_obj = view.get_parent_object() if view and view.request and view.request.user: user_capabilities = get_user_capabilities( diff --git a/awx/main/access.py b/awx/main/access.py index 1105ed0999..7eb8dcecbc 100644 --- a/awx/main/access.py +++ b/awx/main/access.py @@ -220,6 +220,8 @@ class BaseAccess(object): raise LicenseForbids("Features not found in active license.") def get_user_capabilities(self, obj, method_list=[], parent_obj=None): + if obj is None: + return {} user_capabilities = {} # Custom ordering to loop through methods so we can reuse earlier calcs @@ -227,6 +229,17 @@ class BaseAccess(object): if display_method not in method_list: continue + # Validation consistency checks + if display_method == 'copy' and isinstance(obj, JobTemplate): + validation_errors, resources_needed_to_start = obj.resource_validation_data() + if validation_errors: + user_capabilities[display_method] = False + continue + elif display_method == 'start' and isinstance(obj, Group): + if obj.inventory_source and not obj.inventory_source._can_update(): + user_capabilities[display_method] = False + continue + # Grab the answer from the cache, if available if hasattr(obj, 'capabilities_cache') and display_method in obj.capabilities_cache: user_capabilities[display_method] = obj.capabilities_cache[display_method] @@ -243,22 +256,21 @@ class BaseAccess(object): method = display_method # Shortcuts in certain cases by deferring to earlier property - if display_method == 'schedule' and 'edit' in user_capabilities: + if display_method == 'schedule': user_capabilities['schedule'] = user_capabilities['edit'] continue elif display_method == 'delete' and not isinstance(obj, (User, UnifiedJob)): user_capabilities['delete'] = user_capabilities['edit'] continue - if display_method == 'copy' and isinstance(obj, JobTemplate): - validation_errors, resources_needed_to_start = obj.resource_validation_data() - if validation_errors: - user_capabilities['copy'] = False - continue + elif display_method == 'copy' and isinstance(obj, (Group, Host)): + user_capabilities['copy'] = user_capabilities['edit'] + continue # Preprocessing before the access method is called data = {} - if method == 'add' and isinstance(obj, JobTemplate): - data['reference_obj'] = obj + if method == 'add': + if isinstance(obj, JobTemplate): + data['reference_obj'] = obj # Compute permission access_method = getattr(self, "can_%s" % method) @@ -599,7 +611,7 @@ class GroupAccess(BaseAccess): return True def can_start(self, obj): - # Used as another alias to inventory_source start access + # Used as another alias to inventory_source start access for user_capabilities if obj and obj.inventory_source: return self.user.can_access(InventorySource, 'start', obj.inventory_source) return False diff --git a/awx/main/tests/factories/fixtures.py b/awx/main/tests/factories/fixtures.py index c51c29e83c..e52b627076 100644 --- a/awx/main/tests/factories/fixtures.py +++ b/awx/main/tests/factories/fixtures.py @@ -29,7 +29,7 @@ def mk_instance(persisted=True): if not persisted: raise RuntimeError('creating an Instance requires persisted=True') from django.conf import settings - return Instance.objects.get_or_create(uuid=settings.SYSTEM_UUID, primary=True, hostname="instance.example.org") + return Instance.objects.get_or_create(uuid=settings.SYSTEM_UUID, hostname="instance.example.org") def mk_organization(name, description=None, persisted=True): diff --git a/awx/main/tests/functional/api/test_rbac_displays.py b/awx/main/tests/functional/api/test_rbac_displays.py index d207081929..eb94e01dff 100644 --- a/awx/main/tests/functional/api/test_rbac_displays.py +++ b/awx/main/tests/functional/api/test_rbac_displays.py @@ -5,7 +5,10 @@ from django.test.client import RequestFactory from awx.main.models.jobs import JobTemplate from awx.main.models import Role, Group -from awx.main.access import access_registry +from awx.main.access import ( + access_registry, + get_user_capabilities +) from awx.main.utils import cache_list_capabilities from awx.api.serializers import JobTemplateSerializer @@ -300,3 +303,21 @@ def test_prefetch_jt_copy_capability(job_template, project, inventory, machine_c ]}], JobTemplate, rando) assert qs[0].capabilities_cache == {'copy': True} +@pytest.mark.django_db +def test_group_update_capabilities_possible(group, inventory_source, admin_user): + group.inventory_source = inventory_source + group.save() + + capabilities = get_user_capabilities(admin_user, group, method_list=['start']) + assert capabilities['start'] + +@pytest.mark.django_db +def test_group_update_capabilities_impossible(group, inventory_source, admin_user): + inventory_source.source = "" + inventory_source.save() + group.inventory_source = inventory_source + group.save() + + capabilities = get_user_capabilities(admin_user, group, method_list=['start']) + assert not capabilities['start'] + diff --git a/awx/main/tests/functional/conftest.py b/awx/main/tests/functional/conftest.py index e5e1222a39..0c620feb7e 100644 --- a/awx/main/tests/functional/conftest.py +++ b/awx/main/tests/functional/conftest.py @@ -152,7 +152,7 @@ def user_project(user): @pytest.fixture def instance(settings): - return Instance.objects.create(uuid=settings.SYSTEM_UUID, primary=True, hostname="instance.example.org") + return Instance.objects.create(uuid=settings.SYSTEM_UUID, hostname="instance.example.org") @pytest.fixture def organization(instance):