diff --git a/awx/main/management/commands/revoke_oauth2_tokens.py b/awx/main/management/commands/revoke_oauth2_tokens.py new file mode 100644 index 0000000000..ff6af19a37 --- /dev/null +++ b/awx/main/management/commands/revoke_oauth2_tokens.py @@ -0,0 +1,37 @@ +# Django +from django.core.management.base import BaseCommand, CommandError +from django.contrib.auth.models import User +from django.core.exceptions import ObjectDoesNotExist + +# AWX +from awx.main.models.oauth import OAuth2AccessToken +from oauth2_provider.models import RefreshToken + + +def revoke_tokens(token_list): + for token in token_list: + token.revoke() + print('revoked {} {}'.format(token.__class__.__name__, token.token)) + + +class Command(BaseCommand): + """Command that revokes OAuth2 access tokens.""" + help='Revokes OAuth2 access tokens. Use --all to revoke access and refresh tokens.' + + def add_arguments(self, parser): + parser.add_argument('--user', dest='user', type=str, help='revoke OAuth2 tokens for a specific username') + parser.add_argument('--all', dest='all', action='store_true', help='revoke OAuth2 access tokens and refresh tokens') + + def handle(self, *args, **options): + if not options['user']: + if options['all']: + revoke_tokens(RefreshToken.objects.filter(revoked=None)) + revoke_tokens(OAuth2AccessToken.objects.all()) + else: + try: + user = User.objects.get(username=options['user']) + except ObjectDoesNotExist: + raise CommandError('A user with that username does not exist.') + if options['all']: + revoke_tokens(RefreshToken.objects.filter(revoked=None).filter(user=user)) + revoke_tokens(user.main_oauth2accesstoken.filter(user=user)) diff --git a/awx/main/management/commands/revoke_tokens.py b/awx/main/management/commands/revoke_tokens.py deleted file mode 100644 index d2b524b72e..0000000000 --- a/awx/main/management/commands/revoke_tokens.py +++ /dev/null @@ -1,34 +0,0 @@ -# Django -from django.core.management.base import BaseCommand, CommandError -from django.contrib.auth.models import User -from django.core.exceptions import ObjectDoesNotExist - -# AWX -from awx.main.models.oauth import OAuth2AccessToken -from oauth2_provider.models import RefreshToken - -def revoke_tokens(token_list): - for token in token_list: - token.revoke() - -class Command(BaseCommand): - """Command that revokes OAuth2 tokens and refresh tokens.""" - help='Revokes OAuth2 tokens and refresh tokens.' - - def add_arguments(self, parser): - parser.add_argument('--user', dest='user', type=str) - parser.add_argument('--revoke_refresh', dest='revoke_refresh', action='store_true') - - def handle(self, *args, **options): - if not options['user']: - if options['revoke_refresh']: - revoke_tokens(RefreshToken.objects.all()) - revoke_tokens(OAuth2AccessToken.objects.all()) - else: - try: - user = User.objects.get(username=options['user']) - except ObjectDoesNotExist: - raise CommandError('The user does not exist.') - if options['revoke_refresh']: - revoke_tokens(RefreshToken.objects.filter(user=user)) - revoke_tokens(user.main_oauth2accesstoken.filter(user=user)) diff --git a/awx/main/tests/functional/commands/test_oauth2_token_revoke.py b/awx/main/tests/functional/commands/test_oauth2_token_revoke.py new file mode 100644 index 0000000000..0bb355da24 --- /dev/null +++ b/awx/main/tests/functional/commands/test_oauth2_token_revoke.py @@ -0,0 +1,79 @@ +# Python +import datetime +import pytest +import string +import random +import StringIO + +# Django +from django.core.management import call_command +from django.core.management.base import CommandError + +# AWX +from awx.main.models import RefreshToken +from awx.main.models.oauth import OAuth2AccessToken +from awx.api.versioning import reverse + + +@pytest.mark.django_db +class TestOAuth2RevokeCommand: + + def test_non_existing_user(self): + out = StringIO.StringIO() + fake_username = ''.join(random.choice(string.ascii_uppercase + string.digits) for _ in range(6)) + arg = '--user=' + fake_username + with pytest.raises(CommandError) as excinfo: + call_command('revoke_oauth2_tokens', arg, stdout=out) + assert 'A user with that username does not exist' in excinfo.value.message + out.close() + + def test_revoke_all_access_tokens(self, post, admin, alice): + url = reverse('api:o_auth2_token_list') + for user in (admin, alice): + post( + url, + {'description': 'test token', 'scope': 'read'}, + user + ) + assert OAuth2AccessToken.objects.count() == 2 + call_command('revoke_oauth2_tokens') + assert OAuth2AccessToken.objects.count() == 0 + + def test_revoke_access_token_for_user(self, post, admin, alice): + url = reverse('api:o_auth2_token_list') + post( + url, + {'description': 'test token', 'scope': 'read'}, + alice + ) + assert OAuth2AccessToken.objects.count() == 1 + call_command('revoke_oauth2_tokens', '--user=admin') + assert OAuth2AccessToken.objects.count() == 1 + call_command('revoke_oauth2_tokens', '--user=alice') + assert OAuth2AccessToken.objects.count() == 0 + + def test_revoke_all_refresh_tokens(self, post, admin, oauth_application): + url = reverse('api:o_auth2_token_list') + post( + url, + { + 'description': 'test token for', + 'scope': 'read', + 'application': oauth_application.pk + }, + admin + ) + assert OAuth2AccessToken.objects.count() == 1 + assert RefreshToken.objects.count() == 1 + + call_command('revoke_oauth2_tokens') + assert OAuth2AccessToken.objects.count() == 0 + assert RefreshToken.objects.count() == 1 + for r in RefreshToken.objects.all(): + assert r.revoked is None + + call_command('revoke_oauth2_tokens', '--all') + assert RefreshToken.objects.count() == 1 + for r in RefreshToken.objects.all(): + assert r.revoked is not None + assert isinstance(r.revoked, datetime.datetime)