1
0
mirror of https://github.com/ansible/awx.git synced 2024-11-01 08:21:15 +03:00

fix overindent lint failures

This commit is contained in:
Ryan Petrello 2019-01-30 12:12:39 -05:00
parent 44819987f7
commit 2927803a82
No known key found for this signature in database
GPG Key ID: F2AA5F2122351777
7 changed files with 131 additions and 131 deletions

View File

@ -234,17 +234,17 @@ class RoleMetadata(Metadata):
# TODO: Tower 3.3 remove class and all uses in views.py when API v1 is removed
class JobTypeMetadata(Metadata):
def get_field_info(self, field):
res = super(JobTypeMetadata, self).get_field_info(field)
def get_field_info(self, field):
res = super(JobTypeMetadata, self).get_field_info(field)
if field.field_name == 'job_type':
index = 0
for choice in res['choices']:
if choice[0] == 'scan':
res['choices'].pop(index)
break
index += 1
return res
if field.field_name == 'job_type':
index = 0
for choice in res['choices']:
if choice[0] == 'scan':
res['choices'].pop(index)
break
index += 1
return res
class SublistAttachDetatchMetadata(Metadata):

View File

@ -523,7 +523,7 @@ class AuthView(APIView):
not feature_enabled('ldap')) or \
(not feature_enabled('enterprise_auth') and
name in ['saml', 'radius']):
continue
continue
login_url = reverse('social:begin', args=(name,))
complete_url = request.build_absolute_uri(reverse('social:complete', args=(name,)))

View File

@ -38,20 +38,20 @@ class HostManager(models.Manager):
hasattr(self.instance, 'host_filter') and
hasattr(self.instance, 'kind')):
if self.instance.kind == 'smart' and self.instance.host_filter is not None:
q = SmartFilter.query_from_string(self.instance.host_filter)
if self.instance.organization_id:
q = q.filter(inventory__organization=self.instance.organization_id)
# If we are using host_filters, disable the core_filters, this allows
# us to access all of the available Host entries, not just the ones associated
# with a specific FK/relation.
#
# If we don't disable this, a filter of {'inventory': self.instance} gets automatically
# injected by the related object mapper.
self.core_filters = {}
q = SmartFilter.query_from_string(self.instance.host_filter)
if self.instance.organization_id:
q = q.filter(inventory__organization=self.instance.organization_id)
# If we are using host_filters, disable the core_filters, this allows
# us to access all of the available Host entries, not just the ones associated
# with a specific FK/relation.
#
# If we don't disable this, a filter of {'inventory': self.instance} gets automatically
# injected by the related object mapper.
self.core_filters = {}
qs = qs & q
unique_by_name = qs.order_by('name', 'pk').distinct('name')
return qs.filter(pk__in=unique_by_name)
qs = qs & q
unique_by_name = qs.order_by('name', 'pk').distinct('name')
return qs.filter(pk__in=unique_by_name)
return qs

View File

@ -204,7 +204,7 @@ class Role(models.Model):
value = description.get('default')
if '%s' in value and content_type:
value = value % model_name
value = value % model_name
return value

View File

@ -357,10 +357,10 @@ class TaskManager():
return False
def get_latest_project_update(self, job):
latest_project_update = ProjectUpdate.objects.filter(project=job.project, job_type='check').order_by("-created")
if not latest_project_update.exists():
return None
return latest_project_update.first()
latest_project_update = ProjectUpdate.objects.filter(project=job.project, job_type='check').order_by("-created")
if not latest_project_update.exists():
return None
return latest_project_update.first()
def should_update_related_project(self, job, latest_project_update):
now = tz_now()

View File

@ -82,14 +82,14 @@ def test_multi_group_with_shared_dependency(instance_factory, default_instance_g
@pytest.mark.django_db
def test_workflow_job_no_instancegroup(workflow_job_template_factory, default_instance_group, mocker):
wfjt = workflow_job_template_factory('anicedayforawalk').workflow_job_template
wfj = WorkflowJob.objects.create(workflow_job_template=wfjt)
wfj.status = "pending"
wfj.save()
with mocker.patch("awx.main.scheduler.TaskManager.start_task"):
TaskManager().schedule()
TaskManager.start_task.assert_called_once_with(wfj, None, [], None)
assert wfj.instance_group is None
wfjt = workflow_job_template_factory('anicedayforawalk').workflow_job_template
wfj = WorkflowJob.objects.create(workflow_job_template=wfjt)
wfj.status = "pending"
wfj.save()
with mocker.patch("awx.main.scheduler.TaskManager.start_task"):
TaskManager().schedule()
TaskManager.start_task.assert_called_once_with(wfj, None, [], None)
assert wfj.instance_group is None
@pytest.mark.django_db

View File

@ -16,117 +16,117 @@ from awx.api.versioning import reverse
@pytest.mark.django_db
class TestOAuth2Application:
@pytest.mark.parametrize("user_for_access, can_access_list", [
(0, [True, True]),
(1, [True, True]),
(2, [True, True]),
(3, [False, False]),
])
def test_can_read(
self, admin, org_admin, org_member, alice, user_for_access, can_access_list, organization
):
user_list = [admin, org_admin, org_member, alice]
access = OAuth2ApplicationAccess(user_list[user_for_access])
app_creation_user_list = [admin, org_admin]
for user, can_access in zip(app_creation_user_list, can_access_list):
app = Application.objects.create(
name='test app for {}'.format(user.username), user=user,
client_type='confidential', authorization_grant_type='password', organization=organization
)
assert access.can_read(app) is can_access
def test_admin_only_can_read(self, user, organization):
user = user('org-admin', False)
organization.admin_role.members.add(user)
access = OAuth2ApplicationAccess(user)
@pytest.mark.parametrize("user_for_access, can_access_list", [
(0, [True, True]),
(1, [True, True]),
(2, [True, True]),
(3, [False, False]),
])
def test_can_read(
self, admin, org_admin, org_member, alice, user_for_access, can_access_list, organization
):
user_list = [admin, org_admin, org_member, alice]
access = OAuth2ApplicationAccess(user_list[user_for_access])
app_creation_user_list = [admin, org_admin]
for user, can_access in zip(app_creation_user_list, can_access_list):
app = Application.objects.create(
name='test app for {}'.format(user.username), user=user,
client_type='confidential', authorization_grant_type='password', organization=organization
)
assert access.can_read(app) is True
assert access.can_read(app) is can_access
def test_app_activity_stream(self, org_admin, alice, organization):
def test_admin_only_can_read(self, user, organization):
user = user('org-admin', False)
organization.admin_role.members.add(user)
access = OAuth2ApplicationAccess(user)
app = Application.objects.create(
name='test app for {}'.format(user.username), user=user,
client_type='confidential', authorization_grant_type='password', organization=organization
)
assert access.can_read(app) is True
def test_app_activity_stream(self, org_admin, alice, organization):
app = Application.objects.create(
name='test app for {}'.format(org_admin.username), user=org_admin,
client_type='confidential', authorization_grant_type='password', organization=organization
)
access = OAuth2ApplicationAccess(org_admin)
assert access.can_read(app) is True
access = ActivityStreamAccess(org_admin)
activity_stream = ActivityStream.objects.filter(o_auth2_application=app).latest('pk')
assert access.can_read(activity_stream) is True
access = ActivityStreamAccess(alice)
assert access.can_read(app) is False
assert access.can_read(activity_stream) is False
def test_token_activity_stream(self, org_admin, alice, organization, post):
app = Application.objects.create(
name='test app for {}'.format(org_admin.username), user=org_admin,
client_type='confidential', authorization_grant_type='password', organization=organization
)
response = post(
reverse('api:o_auth2_application_token_list', kwargs={'pk': app.pk}),
{'scope': 'read'}, org_admin, expect=201
)
token = AccessToken.objects.get(token=response.data['token'])
access = OAuth2ApplicationAccess(org_admin)
assert access.can_read(app) is True
access = ActivityStreamAccess(org_admin)
activity_stream = ActivityStream.objects.filter(o_auth2_access_token=token).latest('pk')
assert access.can_read(activity_stream) is True
access = ActivityStreamAccess(alice)
assert access.can_read(token) is False
assert access.can_read(activity_stream) is False
def test_can_edit_delete_app_org_admin(
self, admin, org_admin, org_member, alice, organization
):
user_list = [admin, org_admin, org_member, alice]
can_access_list = [True, True, False, False]
for user, can_access in zip(user_list, can_access_list):
app = Application.objects.create(
name='test app for {}'.format(org_admin.username), user=org_admin,
name='test app for {}'.format(user.username), user=org_admin,
client_type='confidential', authorization_grant_type='password', organization=organization
)
access = OAuth2ApplicationAccess(org_admin)
assert access.can_read(app) is True
access = ActivityStreamAccess(org_admin)
activity_stream = ActivityStream.objects.filter(o_auth2_application=app).latest('pk')
assert access.can_read(activity_stream) is True
access = ActivityStreamAccess(alice)
assert access.can_read(app) is False
assert access.can_read(activity_stream) is False
access = OAuth2ApplicationAccess(user)
assert access.can_change(app, {}) is can_access
assert access.can_delete(app) is can_access
def test_token_activity_stream(self, org_admin, alice, organization, post):
def test_can_edit_delete_app_admin(
self, admin, org_admin, org_member, alice, organization
):
user_list = [admin, org_admin, org_member, alice]
can_access_list = [True, True, False, False]
for user, can_access in zip(user_list, can_access_list):
app = Application.objects.create(
name='test app for {}'.format(org_admin.username), user=org_admin,
name='test app for {}'.format(user.username), user=admin,
client_type='confidential', authorization_grant_type='password', organization=organization
)
response = post(
reverse('api:o_auth2_application_token_list', kwargs={'pk': app.pk}),
{'scope': 'read'}, org_admin, expect=201
)
token = AccessToken.objects.get(token=response.data['token'])
access = OAuth2ApplicationAccess(org_admin)
assert access.can_read(app) is True
access = ActivityStreamAccess(org_admin)
activity_stream = ActivityStream.objects.filter(o_auth2_access_token=token).latest('pk')
assert access.can_read(activity_stream) is True
access = ActivityStreamAccess(alice)
assert access.can_read(token) is False
assert access.can_read(activity_stream) is False
def test_can_edit_delete_app_org_admin(
self, admin, org_admin, org_member, alice, organization
):
user_list = [admin, org_admin, org_member, alice]
can_access_list = [True, True, False, False]
for user, can_access in zip(user_list, can_access_list):
app = Application.objects.create(
name='test app for {}'.format(user.username), user=org_admin,
client_type='confidential', authorization_grant_type='password', organization=organization
)
access = OAuth2ApplicationAccess(user)
assert access.can_change(app, {}) is can_access
assert access.can_delete(app) is can_access
def test_can_edit_delete_app_admin(
self, admin, org_admin, org_member, alice, organization
):
user_list = [admin, org_admin, org_member, alice]
can_access_list = [True, True, False, False]
for user, can_access in zip(user_list, can_access_list):
app = Application.objects.create(
name='test app for {}'.format(user.username), user=admin,
client_type='confidential', authorization_grant_type='password', organization=organization
)
access = OAuth2ApplicationAccess(user)
assert access.can_change(app, {}) is can_access
assert access.can_delete(app) is can_access
access = OAuth2ApplicationAccess(user)
assert access.can_change(app, {}) is can_access
assert access.can_delete(app) is can_access
def test_superuser_can_always_create(self, admin, org_admin, org_member, alice, organization):
access = OAuth2ApplicationAccess(admin)
def test_superuser_can_always_create(self, admin, org_admin, org_member, alice, organization):
access = OAuth2ApplicationAccess(admin)
for user in [admin, org_admin, org_member, alice]:
assert access.can_add({
'name': 'test app', 'user': user.pk, 'client_type': 'confidential',
'authorization_grant_type': 'password', 'organization': organization.id
})
def test_normal_user_cannot_create(self, admin, org_admin, org_member, alice, organization):
for access_user in [org_member, alice]:
access = OAuth2ApplicationAccess(access_user)
for user in [admin, org_admin, org_member, alice]:
assert access.can_add({
assert not access.can_add({
'name': 'test app', 'user': user.pk, 'client_type': 'confidential',
'authorization_grant_type': 'password', 'organization': organization.id
})
def test_normal_user_cannot_create(self, admin, org_admin, org_member, alice, organization):
for access_user in [org_member, alice]:
access = OAuth2ApplicationAccess(access_user)
for user in [admin, org_admin, org_member, alice]:
assert not access.can_add({
'name': 'test app', 'user': user.pk, 'client_type': 'confidential',
'authorization_grant_type': 'password', 'organization': organization.id
})
@pytest.mark.django_db