1
0
mirror of https://github.com/ansible/awx.git synced 2024-10-27 09:25:10 +03:00

Add test method to Credential and CredentialType - awxkit

Add test method to Credential and CredentialTypei - awxkit.

The inclusion of this one was discovered when testing the following
issue. https://github.com/ansible/awx/issues/5141
This commit is contained in:
nixocio 2020-02-13 13:40:09 -05:00
parent f57fff732e
commit da486d7788

View File

@ -1,20 +1,28 @@
import logging
from six.moves import http_client as http
import awxkit.exceptions as exc
from awxkit.api.mixins import DSAdapter, HasCopy, HasCreate
from awxkit.api.pages import Organization, Team, User
from awxkit.api.resources import resources
from awxkit.config import config
from awxkit.utils import (
PseudoNamespace,
cloud_types,
filter_by_class,
not_provided,
random_title,
update_payload,
PseudoNamespace)
from awxkit.api.pages import Organization, User, Team
from awxkit.api.mixins import HasCreate, HasCopy, DSAdapter
from awxkit.api.resources import resources
from awxkit.config import config
)
from . import base
from . import page
from . import base, page
from .page import exception_from_status_code
try:
from urllib.parse import urljoin
except ImportError:
from urlparse import urljoin
log = logging.getLogger(__name__)
@ -169,6 +177,18 @@ class CredentialType(HasCreate, base.Base):
CredentialTypes(
self.connection).post(payload))
def test(self, data):
"""Test the credential type endpoint."""
response = self.connection.post(urljoin(str(self.url), 'test/'), data)
exception = exception_from_status_code(response.status_code)
exc_str = "%s (%s) received" % (
http.responses[response.status_code], response.status_code)
if exception:
raise exception(exc_str, response.json())
elif response.status_code == http.FORBIDDEN:
raise exc.Forbidden(exc_str, response.json())
return response
page.register_page([resources.credential_type,
(resources.credential_types, 'post')], CredentialType)
@ -302,6 +322,18 @@ class Credential(HasCopy, HasCreate, base.Base):
return self.update_identity(
Credentials(
self.connection)).post(payload)
def test(self, data):
"""Test the credential endpoint."""
response = self.connection.post(urljoin(str(self.url), 'test/'), data)
exception = exception_from_status_code(response.status_code)
exc_str = "%s (%s) received" % (
http.responses[response.status_code], response.status_code)
if exception:
raise exception(exc_str, response.json())
elif response.status_code == http.FORBIDDEN:
raise exc.Forbidden(exc_str, response.json())
return response
@property
def expected_passwords_needed_to_start(self):