mirror of
https://github.com/ansible/awx.git
synced 2024-11-01 08:21:15 +03:00
Merge pull request #3109 from ryanpetrello/overindent
fix overindent lint failures Reviewed-by: https://github.com/softwarefactory-project-zuul[bot]
This commit is contained in:
commit
2162e8e0cc
@ -234,17 +234,17 @@ class RoleMetadata(Metadata):
|
|||||||
|
|
||||||
# TODO: Tower 3.3 remove class and all uses in views.py when API v1 is removed
|
# TODO: Tower 3.3 remove class and all uses in views.py when API v1 is removed
|
||||||
class JobTypeMetadata(Metadata):
|
class JobTypeMetadata(Metadata):
|
||||||
def get_field_info(self, field):
|
def get_field_info(self, field):
|
||||||
res = super(JobTypeMetadata, self).get_field_info(field)
|
res = super(JobTypeMetadata, self).get_field_info(field)
|
||||||
|
|
||||||
if field.field_name == 'job_type':
|
if field.field_name == 'job_type':
|
||||||
index = 0
|
index = 0
|
||||||
for choice in res['choices']:
|
for choice in res['choices']:
|
||||||
if choice[0] == 'scan':
|
if choice[0] == 'scan':
|
||||||
res['choices'].pop(index)
|
res['choices'].pop(index)
|
||||||
break
|
break
|
||||||
index += 1
|
index += 1
|
||||||
return res
|
return res
|
||||||
|
|
||||||
|
|
||||||
class SublistAttachDetatchMetadata(Metadata):
|
class SublistAttachDetatchMetadata(Metadata):
|
||||||
|
@ -523,7 +523,7 @@ class AuthView(APIView):
|
|||||||
not feature_enabled('ldap')) or \
|
not feature_enabled('ldap')) or \
|
||||||
(not feature_enabled('enterprise_auth') and
|
(not feature_enabled('enterprise_auth') and
|
||||||
name in ['saml', 'radius']):
|
name in ['saml', 'radius']):
|
||||||
continue
|
continue
|
||||||
|
|
||||||
login_url = reverse('social:begin', args=(name,))
|
login_url = reverse('social:begin', args=(name,))
|
||||||
complete_url = request.build_absolute_uri(reverse('social:complete', args=(name,)))
|
complete_url = request.build_absolute_uri(reverse('social:complete', args=(name,)))
|
||||||
|
@ -38,20 +38,20 @@ class HostManager(models.Manager):
|
|||||||
hasattr(self.instance, 'host_filter') and
|
hasattr(self.instance, 'host_filter') and
|
||||||
hasattr(self.instance, 'kind')):
|
hasattr(self.instance, 'kind')):
|
||||||
if self.instance.kind == 'smart' and self.instance.host_filter is not None:
|
if self.instance.kind == 'smart' and self.instance.host_filter is not None:
|
||||||
q = SmartFilter.query_from_string(self.instance.host_filter)
|
q = SmartFilter.query_from_string(self.instance.host_filter)
|
||||||
if self.instance.organization_id:
|
if self.instance.organization_id:
|
||||||
q = q.filter(inventory__organization=self.instance.organization_id)
|
q = q.filter(inventory__organization=self.instance.organization_id)
|
||||||
# If we are using host_filters, disable the core_filters, this allows
|
# If we are using host_filters, disable the core_filters, this allows
|
||||||
# us to access all of the available Host entries, not just the ones associated
|
# us to access all of the available Host entries, not just the ones associated
|
||||||
# with a specific FK/relation.
|
# with a specific FK/relation.
|
||||||
#
|
#
|
||||||
# If we don't disable this, a filter of {'inventory': self.instance} gets automatically
|
# If we don't disable this, a filter of {'inventory': self.instance} gets automatically
|
||||||
# injected by the related object mapper.
|
# injected by the related object mapper.
|
||||||
self.core_filters = {}
|
self.core_filters = {}
|
||||||
|
|
||||||
qs = qs & q
|
qs = qs & q
|
||||||
unique_by_name = qs.order_by('name', 'pk').distinct('name')
|
unique_by_name = qs.order_by('name', 'pk').distinct('name')
|
||||||
return qs.filter(pk__in=unique_by_name)
|
return qs.filter(pk__in=unique_by_name)
|
||||||
return qs
|
return qs
|
||||||
|
|
||||||
|
|
||||||
|
@ -204,7 +204,7 @@ class Role(models.Model):
|
|||||||
value = description.get('default')
|
value = description.get('default')
|
||||||
|
|
||||||
if '%s' in value and content_type:
|
if '%s' in value and content_type:
|
||||||
value = value % model_name
|
value = value % model_name
|
||||||
|
|
||||||
return value
|
return value
|
||||||
|
|
||||||
|
@ -357,10 +357,10 @@ class TaskManager():
|
|||||||
return False
|
return False
|
||||||
|
|
||||||
def get_latest_project_update(self, job):
|
def get_latest_project_update(self, job):
|
||||||
latest_project_update = ProjectUpdate.objects.filter(project=job.project, job_type='check').order_by("-created")
|
latest_project_update = ProjectUpdate.objects.filter(project=job.project, job_type='check').order_by("-created")
|
||||||
if not latest_project_update.exists():
|
if not latest_project_update.exists():
|
||||||
return None
|
return None
|
||||||
return latest_project_update.first()
|
return latest_project_update.first()
|
||||||
|
|
||||||
def should_update_related_project(self, job, latest_project_update):
|
def should_update_related_project(self, job, latest_project_update):
|
||||||
now = tz_now()
|
now = tz_now()
|
||||||
|
@ -82,14 +82,14 @@ def test_multi_group_with_shared_dependency(instance_factory, default_instance_g
|
|||||||
|
|
||||||
@pytest.mark.django_db
|
@pytest.mark.django_db
|
||||||
def test_workflow_job_no_instancegroup(workflow_job_template_factory, default_instance_group, mocker):
|
def test_workflow_job_no_instancegroup(workflow_job_template_factory, default_instance_group, mocker):
|
||||||
wfjt = workflow_job_template_factory('anicedayforawalk').workflow_job_template
|
wfjt = workflow_job_template_factory('anicedayforawalk').workflow_job_template
|
||||||
wfj = WorkflowJob.objects.create(workflow_job_template=wfjt)
|
wfj = WorkflowJob.objects.create(workflow_job_template=wfjt)
|
||||||
wfj.status = "pending"
|
wfj.status = "pending"
|
||||||
wfj.save()
|
wfj.save()
|
||||||
with mocker.patch("awx.main.scheduler.TaskManager.start_task"):
|
with mocker.patch("awx.main.scheduler.TaskManager.start_task"):
|
||||||
TaskManager().schedule()
|
TaskManager().schedule()
|
||||||
TaskManager.start_task.assert_called_once_with(wfj, None, [], None)
|
TaskManager.start_task.assert_called_once_with(wfj, None, [], None)
|
||||||
assert wfj.instance_group is None
|
assert wfj.instance_group is None
|
||||||
|
|
||||||
|
|
||||||
@pytest.mark.django_db
|
@pytest.mark.django_db
|
||||||
|
@ -16,117 +16,117 @@ from awx.api.versioning import reverse
|
|||||||
@pytest.mark.django_db
|
@pytest.mark.django_db
|
||||||
class TestOAuth2Application:
|
class TestOAuth2Application:
|
||||||
|
|
||||||
@pytest.mark.parametrize("user_for_access, can_access_list", [
|
@pytest.mark.parametrize("user_for_access, can_access_list", [
|
||||||
(0, [True, True]),
|
(0, [True, True]),
|
||||||
(1, [True, True]),
|
(1, [True, True]),
|
||||||
(2, [True, True]),
|
(2, [True, True]),
|
||||||
(3, [False, False]),
|
(3, [False, False]),
|
||||||
])
|
])
|
||||||
def test_can_read(
|
def test_can_read(
|
||||||
self, admin, org_admin, org_member, alice, user_for_access, can_access_list, organization
|
self, admin, org_admin, org_member, alice, user_for_access, can_access_list, organization
|
||||||
):
|
):
|
||||||
user_list = [admin, org_admin, org_member, alice]
|
user_list = [admin, org_admin, org_member, alice]
|
||||||
access = OAuth2ApplicationAccess(user_list[user_for_access])
|
access = OAuth2ApplicationAccess(user_list[user_for_access])
|
||||||
app_creation_user_list = [admin, org_admin]
|
app_creation_user_list = [admin, org_admin]
|
||||||
for user, can_access in zip(app_creation_user_list, can_access_list):
|
for user, can_access in zip(app_creation_user_list, can_access_list):
|
||||||
app = Application.objects.create(
|
|
||||||
name='test app for {}'.format(user.username), user=user,
|
|
||||||
client_type='confidential', authorization_grant_type='password', organization=organization
|
|
||||||
)
|
|
||||||
assert access.can_read(app) is can_access
|
|
||||||
|
|
||||||
def test_admin_only_can_read(self, user, organization):
|
|
||||||
user = user('org-admin', False)
|
|
||||||
organization.admin_role.members.add(user)
|
|
||||||
access = OAuth2ApplicationAccess(user)
|
|
||||||
app = Application.objects.create(
|
app = Application.objects.create(
|
||||||
name='test app for {}'.format(user.username), user=user,
|
name='test app for {}'.format(user.username), user=user,
|
||||||
client_type='confidential', authorization_grant_type='password', organization=organization
|
client_type='confidential', authorization_grant_type='password', organization=organization
|
||||||
)
|
)
|
||||||
assert access.can_read(app) is True
|
assert access.can_read(app) is can_access
|
||||||
|
|
||||||
def test_app_activity_stream(self, org_admin, alice, organization):
|
def test_admin_only_can_read(self, user, organization):
|
||||||
|
user = user('org-admin', False)
|
||||||
|
organization.admin_role.members.add(user)
|
||||||
|
access = OAuth2ApplicationAccess(user)
|
||||||
|
app = Application.objects.create(
|
||||||
|
name='test app for {}'.format(user.username), user=user,
|
||||||
|
client_type='confidential', authorization_grant_type='password', organization=organization
|
||||||
|
)
|
||||||
|
assert access.can_read(app) is True
|
||||||
|
|
||||||
|
def test_app_activity_stream(self, org_admin, alice, organization):
|
||||||
|
app = Application.objects.create(
|
||||||
|
name='test app for {}'.format(org_admin.username), user=org_admin,
|
||||||
|
client_type='confidential', authorization_grant_type='password', organization=organization
|
||||||
|
)
|
||||||
|
access = OAuth2ApplicationAccess(org_admin)
|
||||||
|
assert access.can_read(app) is True
|
||||||
|
access = ActivityStreamAccess(org_admin)
|
||||||
|
activity_stream = ActivityStream.objects.filter(o_auth2_application=app).latest('pk')
|
||||||
|
assert access.can_read(activity_stream) is True
|
||||||
|
access = ActivityStreamAccess(alice)
|
||||||
|
assert access.can_read(app) is False
|
||||||
|
assert access.can_read(activity_stream) is False
|
||||||
|
|
||||||
|
|
||||||
|
def test_token_activity_stream(self, org_admin, alice, organization, post):
|
||||||
|
app = Application.objects.create(
|
||||||
|
name='test app for {}'.format(org_admin.username), user=org_admin,
|
||||||
|
client_type='confidential', authorization_grant_type='password', organization=organization
|
||||||
|
)
|
||||||
|
response = post(
|
||||||
|
reverse('api:o_auth2_application_token_list', kwargs={'pk': app.pk}),
|
||||||
|
{'scope': 'read'}, org_admin, expect=201
|
||||||
|
)
|
||||||
|
token = AccessToken.objects.get(token=response.data['token'])
|
||||||
|
access = OAuth2ApplicationAccess(org_admin)
|
||||||
|
assert access.can_read(app) is True
|
||||||
|
access = ActivityStreamAccess(org_admin)
|
||||||
|
activity_stream = ActivityStream.objects.filter(o_auth2_access_token=token).latest('pk')
|
||||||
|
assert access.can_read(activity_stream) is True
|
||||||
|
access = ActivityStreamAccess(alice)
|
||||||
|
assert access.can_read(token) is False
|
||||||
|
assert access.can_read(activity_stream) is False
|
||||||
|
|
||||||
|
|
||||||
|
|
||||||
|
def test_can_edit_delete_app_org_admin(
|
||||||
|
self, admin, org_admin, org_member, alice, organization
|
||||||
|
):
|
||||||
|
user_list = [admin, org_admin, org_member, alice]
|
||||||
|
can_access_list = [True, True, False, False]
|
||||||
|
for user, can_access in zip(user_list, can_access_list):
|
||||||
app = Application.objects.create(
|
app = Application.objects.create(
|
||||||
name='test app for {}'.format(org_admin.username), user=org_admin,
|
name='test app for {}'.format(user.username), user=org_admin,
|
||||||
client_type='confidential', authorization_grant_type='password', organization=organization
|
client_type='confidential', authorization_grant_type='password', organization=organization
|
||||||
)
|
)
|
||||||
access = OAuth2ApplicationAccess(org_admin)
|
access = OAuth2ApplicationAccess(user)
|
||||||
assert access.can_read(app) is True
|
assert access.can_change(app, {}) is can_access
|
||||||
access = ActivityStreamAccess(org_admin)
|
assert access.can_delete(app) is can_access
|
||||||
activity_stream = ActivityStream.objects.filter(o_auth2_application=app).latest('pk')
|
|
||||||
assert access.can_read(activity_stream) is True
|
|
||||||
access = ActivityStreamAccess(alice)
|
|
||||||
assert access.can_read(app) is False
|
|
||||||
assert access.can_read(activity_stream) is False
|
|
||||||
|
|
||||||
|
|
||||||
def test_token_activity_stream(self, org_admin, alice, organization, post):
|
def test_can_edit_delete_app_admin(
|
||||||
|
self, admin, org_admin, org_member, alice, organization
|
||||||
|
):
|
||||||
|
user_list = [admin, org_admin, org_member, alice]
|
||||||
|
can_access_list = [True, True, False, False]
|
||||||
|
for user, can_access in zip(user_list, can_access_list):
|
||||||
app = Application.objects.create(
|
app = Application.objects.create(
|
||||||
name='test app for {}'.format(org_admin.username), user=org_admin,
|
name='test app for {}'.format(user.username), user=admin,
|
||||||
client_type='confidential', authorization_grant_type='password', organization=organization
|
client_type='confidential', authorization_grant_type='password', organization=organization
|
||||||
)
|
)
|
||||||
response = post(
|
access = OAuth2ApplicationAccess(user)
|
||||||
reverse('api:o_auth2_application_token_list', kwargs={'pk': app.pk}),
|
assert access.can_change(app, {}) is can_access
|
||||||
{'scope': 'read'}, org_admin, expect=201
|
assert access.can_delete(app) is can_access
|
||||||
)
|
|
||||||
token = AccessToken.objects.get(token=response.data['token'])
|
|
||||||
access = OAuth2ApplicationAccess(org_admin)
|
|
||||||
assert access.can_read(app) is True
|
|
||||||
access = ActivityStreamAccess(org_admin)
|
|
||||||
activity_stream = ActivityStream.objects.filter(o_auth2_access_token=token).latest('pk')
|
|
||||||
assert access.can_read(activity_stream) is True
|
|
||||||
access = ActivityStreamAccess(alice)
|
|
||||||
assert access.can_read(token) is False
|
|
||||||
assert access.can_read(activity_stream) is False
|
|
||||||
|
|
||||||
|
|
||||||
|
|
||||||
def test_can_edit_delete_app_org_admin(
|
|
||||||
self, admin, org_admin, org_member, alice, organization
|
|
||||||
):
|
|
||||||
user_list = [admin, org_admin, org_member, alice]
|
|
||||||
can_access_list = [True, True, False, False]
|
|
||||||
for user, can_access in zip(user_list, can_access_list):
|
|
||||||
app = Application.objects.create(
|
|
||||||
name='test app for {}'.format(user.username), user=org_admin,
|
|
||||||
client_type='confidential', authorization_grant_type='password', organization=organization
|
|
||||||
)
|
|
||||||
access = OAuth2ApplicationAccess(user)
|
|
||||||
assert access.can_change(app, {}) is can_access
|
|
||||||
assert access.can_delete(app) is can_access
|
|
||||||
|
|
||||||
|
|
||||||
def test_can_edit_delete_app_admin(
|
|
||||||
self, admin, org_admin, org_member, alice, organization
|
|
||||||
):
|
|
||||||
user_list = [admin, org_admin, org_member, alice]
|
|
||||||
can_access_list = [True, True, False, False]
|
|
||||||
for user, can_access in zip(user_list, can_access_list):
|
|
||||||
app = Application.objects.create(
|
|
||||||
name='test app for {}'.format(user.username), user=admin,
|
|
||||||
client_type='confidential', authorization_grant_type='password', organization=organization
|
|
||||||
)
|
|
||||||
access = OAuth2ApplicationAccess(user)
|
|
||||||
assert access.can_change(app, {}) is can_access
|
|
||||||
assert access.can_delete(app) is can_access
|
|
||||||
|
|
||||||
|
|
||||||
def test_superuser_can_always_create(self, admin, org_admin, org_member, alice, organization):
|
|
||||||
access = OAuth2ApplicationAccess(admin)
|
def test_superuser_can_always_create(self, admin, org_admin, org_member, alice, organization):
|
||||||
|
access = OAuth2ApplicationAccess(admin)
|
||||||
|
for user in [admin, org_admin, org_member, alice]:
|
||||||
|
assert access.can_add({
|
||||||
|
'name': 'test app', 'user': user.pk, 'client_type': 'confidential',
|
||||||
|
'authorization_grant_type': 'password', 'organization': organization.id
|
||||||
|
})
|
||||||
|
|
||||||
|
def test_normal_user_cannot_create(self, admin, org_admin, org_member, alice, organization):
|
||||||
|
for access_user in [org_member, alice]:
|
||||||
|
access = OAuth2ApplicationAccess(access_user)
|
||||||
for user in [admin, org_admin, org_member, alice]:
|
for user in [admin, org_admin, org_member, alice]:
|
||||||
assert access.can_add({
|
assert not access.can_add({
|
||||||
'name': 'test app', 'user': user.pk, 'client_type': 'confidential',
|
'name': 'test app', 'user': user.pk, 'client_type': 'confidential',
|
||||||
'authorization_grant_type': 'password', 'organization': organization.id
|
'authorization_grant_type': 'password', 'organization': organization.id
|
||||||
})
|
})
|
||||||
|
|
||||||
def test_normal_user_cannot_create(self, admin, org_admin, org_member, alice, organization):
|
|
||||||
for access_user in [org_member, alice]:
|
|
||||||
access = OAuth2ApplicationAccess(access_user)
|
|
||||||
for user in [admin, org_admin, org_member, alice]:
|
|
||||||
assert not access.can_add({
|
|
||||||
'name': 'test app', 'user': user.pk, 'client_type': 'confidential',
|
|
||||||
'authorization_grant_type': 'password', 'organization': organization.id
|
|
||||||
})
|
|
||||||
|
|
||||||
|
|
||||||
@pytest.mark.django_db
|
@pytest.mark.django_db
|
||||||
|
Loading…
Reference in New Issue
Block a user