mirror of
https://github.com/ansible/awx.git
synced 2024-10-31 23:51:09 +03:00
add RBAC definitions for CredentialInputSource
This commit is contained in:
parent
dcf17683e2
commit
35cca68f04
@ -1165,16 +1165,43 @@ class CredentialAccess(BaseAccess):
|
|||||||
|
|
||||||
class CredentialInputSourceAccess(BaseAccess):
|
class CredentialInputSourceAccess(BaseAccess):
|
||||||
'''
|
'''
|
||||||
I can see credential input sources when:
|
I can see a CredentialInputSource when:
|
||||||
- I'm a superuser (TODO: Update)
|
- I can see the associated target_credential
|
||||||
I can create credential input sources when:
|
I can create/change a CredentialInputSource when:
|
||||||
- I'm a superuser (TODO: Update)
|
- I'm an admin of the associated target_credential
|
||||||
I can delete credential input sources when:
|
- I have use access to the associated source credential
|
||||||
- I'm a superuser (TODO: Update)
|
I can delete a CredentialInputSource when:
|
||||||
|
- I'm an admin of the associated target_credential
|
||||||
'''
|
'''
|
||||||
|
|
||||||
model = CredentialInputSource
|
model = CredentialInputSource
|
||||||
|
|
||||||
|
def filtered_queryset(self):
|
||||||
|
return CredentialInputSource.objects.filter(
|
||||||
|
target_credential__in=Credential.accessible_pk_qs(self.user, 'read_role'))
|
||||||
|
|
||||||
|
@check_superuser
|
||||||
|
def can_read(self, obj):
|
||||||
|
return self.user in obj.target_credential.read_role
|
||||||
|
|
||||||
|
@check_superuser
|
||||||
|
def can_add(self, data):
|
||||||
|
return (
|
||||||
|
self.check_related('target_credential', Credential, data, role_field='admin_role') and
|
||||||
|
self.check_related('source_credential', Credential, data, role_field='use_role')
|
||||||
|
)
|
||||||
|
|
||||||
|
@check_superuser
|
||||||
|
def can_change(self, obj, data):
|
||||||
|
if self.can_add(data) is False:
|
||||||
|
return False
|
||||||
|
|
||||||
|
return self.user in obj.target_credential.admin_role
|
||||||
|
|
||||||
|
@check_superuser
|
||||||
|
def can_delete(self, obj):
|
||||||
|
return self.user in obj.target_credential.admin_role
|
||||||
|
|
||||||
|
|
||||||
class TeamAccess(BaseAccess):
|
class TeamAccess(BaseAccess):
|
||||||
'''
|
'''
|
||||||
|
@ -106,6 +106,150 @@ def test_create_credential_input_source_with_external_target_returns_400(post, a
|
|||||||
assert response.data['target_credential'] == ['Target must be a non-external credential']
|
assert response.data['target_credential'] == ['Target must be a non-external credential']
|
||||||
|
|
||||||
|
|
||||||
|
@pytest.mark.django_db
|
||||||
|
def test_input_source_rbac_associate(get, post, alice, vault_credential, external_credential):
|
||||||
|
sublist_url = reverse(
|
||||||
|
'api:credential_input_source_sublist',
|
||||||
|
kwargs={'version': 'v2', 'pk': vault_credential.pk}
|
||||||
|
)
|
||||||
|
params = {
|
||||||
|
'source_credential': external_credential.pk,
|
||||||
|
'input_field_name': 'vault_password',
|
||||||
|
'associate': True,
|
||||||
|
'metadata': {'key': 'some_key'},
|
||||||
|
}
|
||||||
|
|
||||||
|
# alice can't admin the target *or* source cred
|
||||||
|
response = post(sublist_url, params, alice)
|
||||||
|
assert response.status_code == 403
|
||||||
|
|
||||||
|
# alice can't use the source cred
|
||||||
|
vault_credential.admin_role.members.add(alice)
|
||||||
|
response = post(sublist_url, params, alice)
|
||||||
|
assert response.status_code == 403
|
||||||
|
|
||||||
|
# alice is allowed to associate now
|
||||||
|
external_credential.use_role.members.add(alice)
|
||||||
|
response = post(sublist_url, params, alice)
|
||||||
|
assert response.status_code == 201
|
||||||
|
|
||||||
|
# now let's try disassociation
|
||||||
|
detail = get(response.data['url'], alice)
|
||||||
|
assert detail.status_code == 200
|
||||||
|
vault_credential.admin_role.members.remove(alice)
|
||||||
|
external_credential.use_role.members.remove(alice)
|
||||||
|
|
||||||
|
# now that permissions are removed, alice can't *read* the input source
|
||||||
|
assert get(response.data['url'], alice).status_code == 403
|
||||||
|
|
||||||
|
# alice can't admin the target (so she can't remove the input source)
|
||||||
|
params = {
|
||||||
|
'id': detail.data['id'],
|
||||||
|
'disassociate': True
|
||||||
|
}
|
||||||
|
response = post(sublist_url, params, alice)
|
||||||
|
assert response.status_code == 403
|
||||||
|
|
||||||
|
# alice is allowed to disassociate now
|
||||||
|
vault_credential.admin_role.members.add(alice)
|
||||||
|
response = post(sublist_url, params, alice)
|
||||||
|
assert response.status_code == 204
|
||||||
|
|
||||||
|
|
||||||
|
@pytest.mark.django_db
|
||||||
|
def test_input_source_detail_rbac(get, post, patch, delete, admin, alice,
|
||||||
|
vault_credential, external_credential,
|
||||||
|
other_external_credential):
|
||||||
|
sublist_url = reverse(
|
||||||
|
'api:credential_input_source_sublist',
|
||||||
|
kwargs={'version': 'v2', 'pk': vault_credential.pk}
|
||||||
|
)
|
||||||
|
params = {
|
||||||
|
'source_credential': external_credential.pk,
|
||||||
|
'input_field_name': 'vault_password',
|
||||||
|
'associate': True,
|
||||||
|
'metadata': {'key': 'some_key'},
|
||||||
|
}
|
||||||
|
|
||||||
|
response = post(sublist_url, params, admin)
|
||||||
|
assert response.status_code == 201
|
||||||
|
|
||||||
|
url = response.data['url']
|
||||||
|
|
||||||
|
# alice can't read the input source directly because she can't read the target cred
|
||||||
|
detail = get(url, alice)
|
||||||
|
assert detail.status_code == 403
|
||||||
|
|
||||||
|
# alice can read the input source directly
|
||||||
|
vault_credential.read_role.members.add(alice)
|
||||||
|
detail = get(url, alice)
|
||||||
|
assert detail.status_code == 200
|
||||||
|
|
||||||
|
# she can also see it on the credential sublist
|
||||||
|
response = get(sublist_url, admin)
|
||||||
|
assert response.status_code == 200
|
||||||
|
assert response.data['count'] == 1
|
||||||
|
|
||||||
|
# alice can't change or delete the input source because she can't change the target cred
|
||||||
|
assert patch(url, {'input_field_name': 'vault_id'}, alice).status_code == 403
|
||||||
|
assert delete(url, alice).status_code == 403
|
||||||
|
|
||||||
|
# alice can admin the target cred, so she can change the input field name
|
||||||
|
vault_credential.admin_role.members.add(alice)
|
||||||
|
external_credential.use_role.members.add(alice)
|
||||||
|
assert patch(url, {'input_field_name': 'vault_id'}, alice).status_code == 200
|
||||||
|
assert CredentialInputSource.objects.first().input_field_name == 'vault_id'
|
||||||
|
|
||||||
|
# she _cannot_, however, apply a source credential she doesn't have access to
|
||||||
|
assert patch(url, {'source_credential': other_external_credential.pk}, alice).status_code == 403
|
||||||
|
|
||||||
|
assert delete(url, alice).status_code == 204
|
||||||
|
assert CredentialInputSource.objects.count() == 0
|
||||||
|
|
||||||
|
|
||||||
|
@pytest.mark.django_db
|
||||||
|
def test_input_source_rbac_swap_target_credential(get, post, put, patch, admin, alice,
|
||||||
|
machine_credential, vault_credential,
|
||||||
|
external_credential):
|
||||||
|
# If you change the target credential for an input source,
|
||||||
|
# you have to have admin role on the *original* credential (so you can
|
||||||
|
# remove the relationship) *and* on the *new* credential (so you can apply the
|
||||||
|
# new relationship)
|
||||||
|
sublist_url = reverse(
|
||||||
|
'api:credential_input_source_sublist',
|
||||||
|
kwargs={'version': 'v2', 'pk': vault_credential.pk}
|
||||||
|
)
|
||||||
|
params = {
|
||||||
|
'source_credential': external_credential.pk,
|
||||||
|
'input_field_name': 'vault_password',
|
||||||
|
'associate': True,
|
||||||
|
'metadata': {'key': 'some_key'},
|
||||||
|
}
|
||||||
|
|
||||||
|
response = post(sublist_url, params, admin)
|
||||||
|
assert response.status_code == 201
|
||||||
|
url = response.data['url']
|
||||||
|
|
||||||
|
# alice can't change target cred because she can't admin either one
|
||||||
|
assert patch(url, {
|
||||||
|
'target_credential': machine_credential.pk,
|
||||||
|
'input_field_name': 'password'
|
||||||
|
}, alice).status_code == 403
|
||||||
|
|
||||||
|
# alice still can't change target cred because she can't admin *the new one*
|
||||||
|
vault_credential.admin_role.members.add(alice)
|
||||||
|
assert patch(url, {
|
||||||
|
'target_credential': machine_credential.pk,
|
||||||
|
'input_field_name': 'password'
|
||||||
|
}, alice).status_code == 403
|
||||||
|
|
||||||
|
machine_credential.admin_role.members.add(alice)
|
||||||
|
assert patch(url, {
|
||||||
|
'target_credential': machine_credential.pk,
|
||||||
|
'input_field_name': 'password'
|
||||||
|
}, alice).status_code == 200
|
||||||
|
|
||||||
|
|
||||||
@pytest.mark.django_db
|
@pytest.mark.django_db
|
||||||
def test_create_credential_input_source_with_non_external_source_returns_400(post, admin, credential, vault_credential):
|
def test_create_credential_input_source_with_non_external_source_returns_400(post, admin, credential, vault_credential):
|
||||||
sublist_url = reverse(
|
sublist_url = reverse(
|
||||||
|
Loading…
Reference in New Issue
Block a user