1
0
mirror of https://github.com/ansible/awx.git synced 2024-10-31 23:51:09 +03:00

resolve a few token revocation issues, and add tests

This commit is contained in:
Ryan Petrello 2018-11-08 07:59:44 -05:00
parent 093c29e315
commit 001bd4ca59
No known key found for this signature in database
GPG Key ID: F2AA5F2122351777
3 changed files with 116 additions and 34 deletions

View File

@ -0,0 +1,37 @@
# Django
from django.core.management.base import BaseCommand, CommandError
from django.contrib.auth.models import User
from django.core.exceptions import ObjectDoesNotExist
# AWX
from awx.main.models.oauth import OAuth2AccessToken
from oauth2_provider.models import RefreshToken
def revoke_tokens(token_list):
for token in token_list:
token.revoke()
print('revoked {} {}'.format(token.__class__.__name__, token.token))
class Command(BaseCommand):
"""Command that revokes OAuth2 access tokens."""
help='Revokes OAuth2 access tokens. Use --all to revoke access and refresh tokens.'
def add_arguments(self, parser):
parser.add_argument('--user', dest='user', type=str, help='revoke OAuth2 tokens for a specific username')
parser.add_argument('--all', dest='all', action='store_true', help='revoke OAuth2 access tokens and refresh tokens')
def handle(self, *args, **options):
if not options['user']:
if options['all']:
revoke_tokens(RefreshToken.objects.filter(revoked=None))
revoke_tokens(OAuth2AccessToken.objects.all())
else:
try:
user = User.objects.get(username=options['user'])
except ObjectDoesNotExist:
raise CommandError('A user with that username does not exist.')
if options['all']:
revoke_tokens(RefreshToken.objects.filter(revoked=None).filter(user=user))
revoke_tokens(user.main_oauth2accesstoken.filter(user=user))

View File

@ -1,34 +0,0 @@
# Django
from django.core.management.base import BaseCommand, CommandError
from django.contrib.auth.models import User
from django.core.exceptions import ObjectDoesNotExist
# AWX
from awx.main.models.oauth import OAuth2AccessToken
from oauth2_provider.models import RefreshToken
def revoke_tokens(token_list):
for token in token_list:
token.revoke()
class Command(BaseCommand):
"""Command that revokes OAuth2 tokens and refresh tokens."""
help='Revokes OAuth2 tokens and refresh tokens.'
def add_arguments(self, parser):
parser.add_argument('--user', dest='user', type=str)
parser.add_argument('--revoke_refresh', dest='revoke_refresh', action='store_true')
def handle(self, *args, **options):
if not options['user']:
if options['revoke_refresh']:
revoke_tokens(RefreshToken.objects.all())
revoke_tokens(OAuth2AccessToken.objects.all())
else:
try:
user = User.objects.get(username=options['user'])
except ObjectDoesNotExist:
raise CommandError('The user does not exist.')
if options['revoke_refresh']:
revoke_tokens(RefreshToken.objects.filter(user=user))
revoke_tokens(user.main_oauth2accesstoken.filter(user=user))

View File

@ -0,0 +1,79 @@
# Python
import datetime
import pytest
import string
import random
import StringIO
# Django
from django.core.management import call_command
from django.core.management.base import CommandError
# AWX
from awx.main.models import RefreshToken
from awx.main.models.oauth import OAuth2AccessToken
from awx.api.versioning import reverse
@pytest.mark.django_db
class TestOAuth2RevokeCommand:
def test_non_existing_user(self):
out = StringIO.StringIO()
fake_username = ''.join(random.choice(string.ascii_uppercase + string.digits) for _ in range(6))
arg = '--user=' + fake_username
with pytest.raises(CommandError) as excinfo:
call_command('revoke_oauth2_tokens', arg, stdout=out)
assert 'A user with that username does not exist' in excinfo.value.message
out.close()
def test_revoke_all_access_tokens(self, post, admin, alice):
url = reverse('api:o_auth2_token_list')
for user in (admin, alice):
post(
url,
{'description': 'test token', 'scope': 'read'},
user
)
assert OAuth2AccessToken.objects.count() == 2
call_command('revoke_oauth2_tokens')
assert OAuth2AccessToken.objects.count() == 0
def test_revoke_access_token_for_user(self, post, admin, alice):
url = reverse('api:o_auth2_token_list')
post(
url,
{'description': 'test token', 'scope': 'read'},
alice
)
assert OAuth2AccessToken.objects.count() == 1
call_command('revoke_oauth2_tokens', '--user=admin')
assert OAuth2AccessToken.objects.count() == 1
call_command('revoke_oauth2_tokens', '--user=alice')
assert OAuth2AccessToken.objects.count() == 0
def test_revoke_all_refresh_tokens(self, post, admin, oauth_application):
url = reverse('api:o_auth2_token_list')
post(
url,
{
'description': 'test token for',
'scope': 'read',
'application': oauth_application.pk
},
admin
)
assert OAuth2AccessToken.objects.count() == 1
assert RefreshToken.objects.count() == 1
call_command('revoke_oauth2_tokens')
assert OAuth2AccessToken.objects.count() == 0
assert RefreshToken.objects.count() == 1
for r in RefreshToken.objects.all():
assert r.revoked is None
call_command('revoke_oauth2_tokens', '--all')
assert RefreshToken.objects.count() == 1
for r in RefreshToken.objects.all():
assert r.revoked is not None
assert isinstance(r.revoked, datetime.datetime)