1
0
mirror of https://github.com/ansible/awx.git synced 2024-10-31 06:51:10 +03:00

Added an awx-manage command for generating OAuth2 token.

This commit is contained in:
Yunfan Zhang 2018-05-31 14:27:21 -04:00
parent bd744bced7
commit 2c4f7911a6
3 changed files with 88 additions and 26 deletions

View File

@ -949,7 +949,6 @@ class UserSerializer(BaseSerializer):
tokens = self.reverse('api:o_auth2_token_list', kwargs={'pk': obj.pk}),
authorized_tokens = self.reverse('api:user_authorized_token_list', kwargs={'pk': obj.pk}),
personal_tokens = self.reverse('api:o_auth2_personal_token_list', kwargs={'pk': obj.pk}),
))
return res
@ -986,7 +985,7 @@ class UserSerializer(BaseSerializer):
class UserAuthorizedTokenSerializer(BaseSerializer):
refresh_token = serializers.SerializerMethodField()
token = serializers.SerializerMethodField()
@ -997,7 +996,7 @@ class UserAuthorizedTokenSerializer(BaseSerializer):
'expires', 'scope', 'application'
)
read_only_fields = ('user', 'token', 'expires')
def get_token(self, obj):
request = self.context.get('request', None)
try:
@ -1006,8 +1005,8 @@ class UserAuthorizedTokenSerializer(BaseSerializer):
else:
return TOKEN_CENSOR
except ObjectDoesNotExist:
return ''
return ''
def get_refresh_token(self, obj):
request = self.context.get('request', None)
try:
@ -1035,12 +1034,12 @@ class UserAuthorizedTokenSerializer(BaseSerializer):
access_token=obj
)
return obj
class OAuth2ApplicationSerializer(BaseSerializer):
show_capabilities = ['edit', 'delete']
class Meta:
model = OAuth2Application
fields = (
@ -1053,15 +1052,15 @@ class OAuth2ApplicationSerializer(BaseSerializer):
'user': {'allow_null': True, 'required': False},
'organization': {'allow_null': False},
'authorization_grant_type': {'allow_null': False}
}
}
def to_representation(self, obj):
ret = super(OAuth2ApplicationSerializer, self).to_representation(obj)
if obj.client_type == 'public':
ret.pop('client_secret', None)
return ret
def get_modified(self, obj):
if obj is None:
return None
@ -1170,9 +1169,10 @@ class OAuth2TokenSerializer(BaseSerializer):
).format(self.ALLOWED_SCOPES))
return value
def create(self, validated_data):
current_user = self.context['request'].user
validated_data['user'] = current_user
def create(self, validated_data, from_command_line=False):
if not from_command_line:
current_user = self.context['request'].user
validated_data['user'] = current_user
validated_data['token'] = generate_token()
validated_data['expires'] = now() + timedelta(
seconds=settings.OAUTH2_PROVIDER['ACCESS_TOKEN_EXPIRE_SECONDS']
@ -1189,16 +1189,16 @@ class OAuth2TokenSerializer(BaseSerializer):
access_token=obj
)
return obj
class OAuth2TokenDetailSerializer(OAuth2TokenSerializer):
class Meta:
read_only_fields = ('*', 'user', 'application')
read_only_fields = ('*', 'user', 'application')
class OAuth2AuthorizedTokenSerializer(BaseSerializer):
refresh_token = serializers.SerializerMethodField()
token = serializers.SerializerMethodField()
@ -1212,7 +1212,7 @@ class OAuth2AuthorizedTokenSerializer(BaseSerializer):
extra_kwargs = {
'scope': {'allow_null': False, 'required': True}
}
def get_token(self, obj):
request = self.context.get('request', None)
try:
@ -1221,8 +1221,8 @@ class OAuth2AuthorizedTokenSerializer(BaseSerializer):
else:
return TOKEN_CENSOR
except ObjectDoesNotExist:
return ''
return ''
def get_refresh_token(self, obj):
request = self.context.get('request', None)
try:
@ -1232,7 +1232,7 @@ class OAuth2AuthorizedTokenSerializer(BaseSerializer):
return TOKEN_CENSOR
except ObjectDoesNotExist:
return ''
def create(self, validated_data):
current_user = self.context['request'].user
validated_data['user'] = current_user
@ -1252,10 +1252,10 @@ class OAuth2AuthorizedTokenSerializer(BaseSerializer):
access_token=obj
)
return obj
class OAuth2PersonalTokenSerializer(BaseSerializer):
refresh_token = serializers.SerializerMethodField()
token = serializers.SerializerMethodField()
@ -1311,7 +1311,7 @@ class OAuth2PersonalTokenSerializer(BaseSerializer):
obj = super(OAuth2PersonalTokenSerializer, self).create(validated_data)
obj.save()
return obj
class OrganizationSerializer(BaseSerializer):
show_capabilities = ['edit', 'delete']

View File

@ -0,0 +1,26 @@
from awx.api.serializers import OAuth2TokenSerializer
from django.core.management.base import BaseCommand
from django.contrib.auth.models import User
from django.core.exceptions import ObjectDoesNotExist
class Command(BaseCommand):
"""Command that creates an OAuth2 token for a certain user. Returns the value of created token."""
help='Usage: awx-manage create_oauth2_token --user=username. Will return the value of created token.'
def add_arguments(self, parser):
parser.add_argument('--user', dest='user', type=str)
def handle(self, *args, **options):
if not options['user']:
self.stdout.write("Username not supplied. Usage: awx-manage create_oauth2_token --user=username.")
return
try:
user = User.objects.get(username=options['user'])
except ObjectDoesNotExist:
self.stdout.write("The user does not exist.")
return
config = {'user': user, 'scope':'write'}
serializer_obj = OAuth2TokenSerializer()
token_record = serializer_obj.create(config, True)
self.stdout.write(token_record.token)

View File

@ -0,0 +1,36 @@
import pytest
import string
import random
import StringIO
from django.contrib.auth.models import User
from django.core.management import call_command
from awx.main.models.oauth import OAuth2AccessToken
@pytest.mark.django_db
@pytest.mark.inventory_import
class TestOAuth2CreateCommand:
def test_no_user_option(self):
out = StringIO.StringIO()
call_command('create_oauth2_token', stdout=out)
assert 'Username not supplied.' in out.getvalue()
out.close()
def test_non_existing_user(self):
out = StringIO.StringIO()
fake_username = ''
while fake_username == '' or User.objects.filter(username=fake_username).exists():
fake_username = ''.join(random.choice(string.ascii_uppercase + string.digits) for _ in range(6))
arg = '--user=' + fake_username
call_command('create_oauth2_token', arg, stdout=out)
assert 'The user does not exist.' == out.getvalue().strip()
out.close()
def test_correct_user(self, alice):
out = StringIO.StringIO()
arg = '--user=' + 'alice'
call_command('create_oauth2_token', arg, stdout=out)
generated_token = out.getvalue().strip()
assert OAuth2AccessToken.objects.filter(user=alice, token=generated_token).count() == 1
assert OAuth2AccessToken.objects.get(user=alice, token=generated_token).scope == 'write'
out.close()