1
0
mirror of https://github.com/ansible/awx.git synced 2024-11-02 01:21:21 +03:00

fixes app token endpoint

This commit is contained in:
adamscmRH 2018-02-23 11:06:53 -05:00
parent 99989892cd
commit 2911dec324
6 changed files with 115 additions and 30 deletions

View File

@ -890,7 +890,7 @@ class UserSerializer(BaseSerializer):
access_list = self.reverse('api:user_access_list', kwargs={'pk': obj.pk}),
applications = self.reverse('api:o_auth2_application_list', kwargs={'pk': obj.pk}),
tokens = self.reverse('api:o_auth2_token_list', kwargs={'pk': obj.pk}),
authorized_tokens = self.reverse('api:o_auth2_authorized_token_list', kwargs={'pk': obj.pk}),
authorized_tokens = self.reverse('api:user_authorized_token_list', kwargs={'pk': obj.pk}),
personal_tokens = self.reverse('api:o_auth2_personal_token_list', kwargs={'pk': obj.pk}),
))
@ -928,7 +928,59 @@ class UserSerializer(BaseSerializer):
return self._validate_ldap_managed_field(value, 'is_superuser')
class OauthApplicationSerializer(BaseSerializer):
class UserAuthorizedTokenSerializer(BaseSerializer):
refresh_token = serializers.SerializerMethodField()
token = serializers.SerializerMethodField()
class Meta:
model = OAuth2AccessToken
fields = (
'*', '-name', 'description', 'user', 'token', 'refresh_token',
'expires', 'scope', 'application',
)
read_only_fields = ('user', 'token', 'expires')
read_only_on_update_fields = ('application',)
def get_token(self, obj):
request = self.context.get('request', None)
try:
if request.method == 'POST':
return obj.token
else:
return '*************'
except ObjectDoesNotExist:
return ''
def get_refresh_token(self, obj):
request = self.context.get('request', None)
try:
if request.method == 'POST':
return getattr(obj.refresh_token, 'token', '')
else:
return '**************'
except ObjectDoesNotExist:
return ''
def create(self, validated_data):
validated_data['user'] = self.context['request'].user
validated_data['token'] = generate_token()
validated_data['expires'] = now() + timedelta(
seconds=oauth2_settings.ACCESS_TOKEN_EXPIRE_SECONDS
)
obj = super(OAuth2TokenSerializer, self).create(validated_data)
obj.save()
if obj.application is not None:
OAuth2RefreshToken.objects.create(
user=self.context['request'].user,
token=generate_token(),
application=obj.application,
access_token=obj
)
return obj
class OAuth2ApplicationSerializer(BaseSerializer):
class Meta:
model = OAuth2Application
@ -944,7 +996,7 @@ class OauthApplicationSerializer(BaseSerializer):
}
def to_representation(self, obj):
ret = super(OauthApplicationSerializer, self).to_representation(obj)
ret = super(OAuth2ApplicationSerializer, self).to_representation(obj)
if obj.client_type == 'public':
ret.pop('client_secret')
return ret
@ -956,7 +1008,7 @@ class OauthApplicationSerializer(BaseSerializer):
return obj.updated
def get_related(self, obj):
ret = super(OauthApplicationSerializer, self).get_related(obj)
ret = super(OAuth2ApplicationSerializer, self).get_related(obj)
if obj.user:
ret['user'] = self.reverse('api:user_detail', kwargs={'pk': obj.user.pk})
ret['tokens'] = self.reverse(
@ -979,12 +1031,12 @@ class OauthApplicationSerializer(BaseSerializer):
return {'count': token_count, 'results': token_list}
def get_summary_fields(self, obj):
ret = super(OauthApplicationSerializer, self).get_summary_fields(obj)
ret = super(OAuth2ApplicationSerializer, self).get_summary_fields(obj)
ret['tokens'] = self._summary_field_tokens(obj)
return ret
class OauthTokenSerializer(BaseSerializer):
class OAuth2TokenSerializer(BaseSerializer):
refresh_token = serializers.SerializerMethodField()
token = serializers.SerializerMethodField()
@ -993,7 +1045,7 @@ class OauthTokenSerializer(BaseSerializer):
model = OAuth2AccessToken
fields = (
'*', '-name', 'description', 'user', 'token', 'refresh_token',
'-application', 'expires', 'scope',
'application', 'expires', 'scope',
)
read_only_fields = ('user', 'token', 'expires')
@ -1003,7 +1055,7 @@ class OauthTokenSerializer(BaseSerializer):
return obj.updated
def get_related(self, obj):
ret = super(OauthTokenSerializer, self).get_related(obj)
ret = super(OAuth2TokenSerializer, self).get_related(obj)
if obj.user:
ret['user'] = self.reverse('api:user_detail', kwargs={'pk': obj.user.pk})
if obj.application:
@ -1036,11 +1088,12 @@ class OauthTokenSerializer(BaseSerializer):
return ''
def create(self, validated_data):
validated_data['user'] = self.context['request'].user
validated_data['token'] = generate_token()
validated_data['expires'] = now() + timedelta(
seconds=oauth2_settings.ACCESS_TOKEN_EXPIRE_SECONDS
)
obj = super(OauthTokenSerializer, self).create(validated_data)
obj = super(OAuth2TokenSerializer, self).create(validated_data)
if obj.application and obj.application.user:
obj.user = obj.application.user
obj.save()
@ -1088,6 +1141,25 @@ class OAuth2AuthorizedTokenSerializer(BaseSerializer):
except ObjectDoesNotExist:
return ''
def create(self, validated_data):
validated_data['user'] = self.context['request'].user
validated_data['token'] = generate_token()
validated_data['expires'] = now() + timedelta(
seconds=oauth2_settings.ACCESS_TOKEN_EXPIRE_SECONDS
)
obj = super(OAuth2AuthorizedTokenSerializer, self).create(validated_data)
if obj.application and obj.application.user:
obj.user = obj.application.user
obj.save()
if obj.application is not None:
OAuth2RefreshToken.objects.create(
user=obj.application.user if obj.application.user else None,
token=generate_token(),
application=obj.application,
access_token=obj
)
return obj
class OAuth2PersonalTokenSerializer(BaseSerializer):
@ -1135,12 +1207,11 @@ class OAuth2PersonalTokenSerializer(BaseSerializer):
return None
def create(self, validated_data):
user = self.context['request'].user
validated_data['user'] = self.context['request'].user
validated_data['token'] = generate_token()
validated_data['expires'] = now() + timedelta(
seconds=oauth2_settings.ACCESS_TOKEN_EXPIRE_SECONDS
)
validated_data['user'] = user
obj = super(OAuth2PersonalTokenSerializer, self).create(validated_data)
obj.save()
return obj

View File

@ -32,7 +32,6 @@ from awx.api.views import (
OAuth2TokenList,
ApplicationOAuth2TokenList,
OAuth2ApplicationDetail,
)
from .organization import urls as organization_urls

View File

@ -17,7 +17,8 @@ from awx.api.views import (
OAuth2ApplicationList,
OAuth2TokenList,
OAuth2AuthorizedTokenList,
OAuth2PersonalTokenList
OAuth2PersonalTokenList,
UserAuthorizedTokenList,
)
urls = [
@ -33,7 +34,7 @@ urls = [
url(r'^(?P<pk>[0-9]+)/access_list/$', UserAccessList.as_view(), name='user_access_list'),
url(r'^(?P<pk>[0-9]+)/applications/$', OAuth2ApplicationList.as_view(), name='o_auth2_application_list'),
url(r'^(?P<pk>[0-9]+)/tokens/$', OAuth2TokenList.as_view(), name='o_auth2_token_list'),
url(r'^(?P<pk>[0-9]+)/authorized_tokens/$', OAuth2AuthorizedTokenList.as_view(), name='o_auth2_authorized_token_list'),
url(r'^(?P<pk>[0-9]+)/authorized_tokens/$', UserAuthorizedTokenList.as_view(), name='user_authorized_token_list'),
url(r'^(?P<pk>[0-9]+)/personal_tokens/$', OAuth2PersonalTokenList.as_view(), name='o_auth2_personal_token_list'),
]

View File

@ -1507,7 +1507,7 @@ class OAuth2ApplicationList(ListCreateAPIView):
view_name = _("OAuth Applications")
model = OAuth2Application
serializer_class = OauthApplicationSerializer
serializer_class = OAuth2ApplicationSerializer
class OAuth2ApplicationDetail(RetrieveUpdateDestroyAPIView):
@ -1515,7 +1515,7 @@ class OAuth2ApplicationDetail(RetrieveUpdateDestroyAPIView):
view_name = _("OAuth Application Detail")
model = OAuth2Application
serializer_class = OauthApplicationSerializer
serializer_class = OAuth2ApplicationSerializer
class ApplicationOAuth2TokenList(SubListCreateAPIView):
@ -1523,7 +1523,7 @@ class ApplicationOAuth2TokenList(SubListCreateAPIView):
view_name = _("OAuth Application Tokens")
model = OAuth2AccessToken
serializer_class = OauthTokenSerializer
serializer_class = OAuth2TokenSerializer
parent_model = OAuth2Application
relationship = 'oauth2accesstoken_set'
parent_key = 'application'
@ -1539,10 +1539,10 @@ class OAuth2ApplicationActivityStreamList(ActivityStreamEnforcementMixin, SubLis
class OAuth2TokenList(ListCreateAPIView):
view_name = _("OAuth Tokens")
view_name = _("OAuth2 Tokens")
model = OAuth2AccessToken
serializer_class = OauthTokenSerializer
serializer_class = OAuth2TokenSerializer
class OAuth2AuthorizedTokenList(SubListCreateAPIView):
@ -1557,6 +1557,20 @@ class OAuth2AuthorizedTokenList(SubListCreateAPIView):
def get_queryset(self):
return get_access_token_model().objects.filter(application__isnull=False, user=self.request.user)
class UserAuthorizedTokenList(SubListCreateAPIView):
view_name = _("OAuth2 User Authorized Access Tokens")
model = OAuth2AccessToken
serializer_class = OAuth2AuthorizedTokenSerializer
parent_model = User
relationship = 'oauth2accesstoken_set'
parent_key = 'user'
def get_queryset(self):
return get_access_token_model().objects.filter(application__isnull=False, user=self.request.user)
class OAuth2PersonalTokenList(SubListCreateAPIView):
@ -1578,7 +1592,7 @@ class OAuth2TokenDetail(RetrieveUpdateDestroyAPIView):
view_name = _("OAuth Token Detail")
model = OAuth2AccessToken
serializer_class = OauthTokenSerializer
serializer_class = OAuth2TokenSerializer
class OAuth2TokenActivityStreamList(ActivityStreamEnforcementMixin, SubListAPIView):

View File

@ -557,7 +557,7 @@ class UserAccess(BaseAccess):
return super(UserAccess, self).can_unattach(obj, sub_obj, relationship, *args, **kwargs)
class OauthApplicationAccess(BaseAccess):
class OAuth2ApplicationAccess(BaseAccess):
'''
I can read, change or delete OAuth applications when:
- I am a superuser.
@ -592,7 +592,7 @@ class OauthApplicationAccess(BaseAccess):
return set(self.user.admin_of_organizations.all()) & set(user.organizations.all())
class OauthTokenAccess(BaseAccess):
class OAuth2TokenAccess(BaseAccess):
'''
I can read, change or delete an OAuth token when:
- I am a superuser.
@ -621,7 +621,7 @@ class OauthTokenAccess(BaseAccess):
app = get_object_from_data('application', OAuth2Application, data)
if not app:
return True
return OauthApplicationAccess(self.user).can_read(app)
return OAuth2ApplicationAccess(self.user).can_read(app)
class OrganizationAccess(BaseAccess):

View File

@ -1,8 +1,8 @@
import pytest
from awx.main.access import (
OauthApplicationAccess,
OauthTokenAccess,
OAuth2ApplicationAccess,
OAuth2TokenAccess,
)
from awx.main.models.oauth import (
OAuth2Application as Application,
@ -24,7 +24,7 @@ class TestOAuthApplication:
self, admin, org_admin, org_member, alice, user_for_access, can_access_list
):
user_list = [admin, org_admin, org_member, alice]
access = OauthApplicationAccess(user_list[user_for_access])
access = OAuth2ApplicationAccess(user_list[user_for_access])
for user, can_access in zip(user_list, can_access_list):
app = Application.objects.create(
name='test app for {}'.format(user.username), user=user,
@ -35,7 +35,7 @@ class TestOAuthApplication:
assert access.can_delete(app) is can_access
def test_superuser_can_always_create(self, admin, org_admin, org_member, alice):
access = OauthApplicationAccess(admin)
access = OAuth2ApplicationAccess(admin)
for user in [admin, org_admin, org_member, alice]:
assert access.can_add({
'name': 'test app', 'user': user.pk, 'client_type': 'confidential',
@ -44,7 +44,7 @@ class TestOAuthApplication:
def test_normal_user_cannot_create(self, admin, org_admin, org_member, alice):
for access_user in [org_member, alice]:
access = OauthApplicationAccess(access_user)
access = OAuth2ApplicationAccess(access_user)
for user in [admin, org_admin, org_member, alice]:
assert not access.can_add({
'name': 'test app', 'user': user.pk, 'client_type': 'confidential',
@ -52,7 +52,7 @@ class TestOAuthApplication:
})
def test_org_admin_can_create_in_org(self, admin, org_admin, org_member, alice):
access = OauthApplicationAccess(org_admin)
access = OAuth2ApplicationAccess(org_admin)
for user in [admin, alice]:
assert not access.can_add({
'name': 'test app', 'user': user.pk, 'client_type': 'confidential',
@ -79,7 +79,7 @@ class TestOAuthToken:
self, post, admin, org_admin, org_member, alice, user_for_access, can_access_list
):
user_list = [admin, org_admin, org_member, alice]
access = OauthTokenAccess(user_list[user_for_access])
access = OAuth2TokenAccess(user_list[user_for_access])
for user, can_access in zip(user_list, can_access_list):
app = Application.objects.create(
name='test app for {}'.format(user.username), user=user,