1
0
mirror of https://github.com/ansible/awx.git synced 2024-11-02 09:51:09 +03:00

Merge pull request #81 from chrismeyersfsu/feature-survey_password_redact

redact survey password fields in job detail extra_vars
This commit is contained in:
Chris Meyers 2015-02-19 11:47:17 -05:00
commit 71e0469fab
3 changed files with 62 additions and 16 deletions

View File

@ -33,6 +33,7 @@ from polymorphic import PolymorphicModel
from awx.main.constants import SCHEDULEABLE_PROVIDERS
from awx.main.models import * # noqa
from awx.main.utils import get_type_for_model, get_model_for_type
from awx.main.redact import REPLACE_STR
logger = logging.getLogger('awx.api.serializers')
@ -1419,6 +1420,13 @@ class JobSerializer(UnifiedJobSerializer, JobOptionsSerializer):
return ret
if 'job_template' in ret and (not obj.job_template or not obj.job_template.active):
ret['job_template'] = None
if obj.job_template and obj.job_template.survey_enabled:
extra_vars = json.loads(ret['extra_vars'])
for key in obj.job_template.survey_password_variables():
if key in extra_vars:
extra_vars[key] = REPLACE_STR
ret['extra_vars'] = json.dumps(extra_vars)
return ret

View File

@ -209,6 +209,15 @@ class JobTemplate(UnifiedJobTemplate, JobOptions):
vars.append(survey_element['variable'])
return vars
def survey_password_variables(self):
vars = []
if self.survey_enabled and 'spec' in self.survey_spec:
# Get variables that are type password
for survey_element in self.survey_spec['spec']:
if survey_element['type'] == 'password':
vars.append(survey_element['variable'])
return vars
def survey_variable_validation(self, data):
errors = []
if not self.survey_enabled:
@ -458,14 +467,8 @@ class Job(UnifiedJob, JobOptions):
# Then lookup password fields in extra_vars and save the values
jt = self.job_template
if jt and jt.survey_enabled and 'spec' in jt.survey_spec:
vars = []
# Get variables that are type password
for survey_element in jt.survey_spec['spec']:
if survey_element['type'] == 'password':
vars.append(survey_element['variable'])
# Use password vars to find in extra_vars
for key in vars:
for key in jt.survey_password_variables():
if key in self.extra_vars_dict:
content = PlainTextCleaner.remove_sensitive(content, self.extra_vars_dict[key])
return content

View File

@ -8,7 +8,7 @@ from django.core.urlresolvers import reverse
from awx.main.models import * # noqa
from awx.main.tests.base import BaseTest
__all__ = ['SurveyPasswordTest']
__all__ = ['SurveyPasswordRedactedTest']
PASSWORD="5m/h"
ENCRYPTED_STR='$encrypted$'
@ -154,11 +154,41 @@ class SurveyPasswordBaseTest(BaseTest):
self.check_found(response['content'], ENCRYPTED_STR, test['occurances'], test['description'])
def _get_url_job_stdout(self, job):
job_stdout_url = reverse('api:job_stdout', args=(job.pk,))
return self.get(job_stdout_url, expect=200, auth=self.get_super_credentials(), accept='application/json')
# TODO: A more complete test would ensure that the variable value isn't found
def check_extra_vars_redacted(self, test, response):
self.assertIsNotNone(response)
# Ensure that all extra_vars of type password have the value '$encrypted$'
vars = []
for question in test['survey']['spec']:
if question['type'] == 'password':
vars.append(question['variable'])
class SurveyPasswordTest(SurveyPasswordBaseTest):
extra_vars = json.loads(response['extra_vars'])
for var in vars:
self.assertIn(var, extra_vars, 'Variable "%s" should exist in "%s"' % (var, extra_vars))
self.assertEqual(extra_vars[var], ENCRYPTED_STR)
def _get_url_job_stdout(self, job):
url = reverse('api:job_stdout', args=(job.pk,))
return self.get(url, expect=200, auth=self.get_super_credentials(), accept='application/json')
def _get_url_job_details(self, job):
url = reverse('api:job_detail', args=(job.pk,))
return self.get(url, expect=200, auth=self.get_super_credentials(), accept='application/json')
class SurveyPasswordRedactedTest(SurveyPasswordBaseTest):
'''
Transpose TEST[]['tests'] to the below format. A more flat format."
[
{
'text': '...',
'description': '...',
...,
'job': '...',
'survey': '...'
},
]
'''
def setup_test(self, test_name):
blueprint = TESTS[test_name]
self.tests[test_name] = []
@ -178,25 +208,30 @@ class SurveyPasswordTest(SurveyPasswordBaseTest):
job.result_stdout_text = test['text']
job.save()
test['job'] = job
test['survey'] = blueprint['survey']
self.tests[test_name].append(test)
def setUp(self):
super(SurveyPasswordTest, self).setUp()
super(SurveyPasswordRedactedTest, self).setUp()
self.tests = {}
self.setup_test('simple')
self.setup_test('complex')
# should redact single variable survey
def test_survey_password_redact_simple_survey(self):
def test_redact_stdout_simple_survey(self):
for test in self.tests['simple']:
response = self._get_url_job_stdout(test['job'])
self.check_passwords_redacted(test, response)
# should redact multiple variables survey
def test_survey_password_redact_complex_survey(self):
def test_redact_stdout_complex_survey(self):
for test in self.tests['complex']:
response = self._get_url_job_stdout(test['job'])
self.check_passwords_redacted(test, response)
# should redact values in extra_vars
def test_redact_job_extra_vars(self):
for test in self.tests['simple']:
response = self._get_url_job_details(test['job'])
self.check_extra_vars_redacted(test, response)