1
0
mirror of https://github.com/ansible/awx.git synced 2024-11-02 01:21:21 +03:00

Merge pull request #1374 from chrismeyersfsu/fix-proj_update_redact

redact project update urls when downloading stdout
This commit is contained in:
Chris Meyers 2018-04-19 09:09:24 -04:00 committed by GitHub
commit df298cec36
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
3 changed files with 109 additions and 68 deletions

View File

@ -77,6 +77,7 @@ from awx.main.utils import (
from awx.main.utils.encryption import encrypt_value from awx.main.utils.encryption import encrypt_value
from awx.main.utils.filters import SmartFilter from awx.main.utils.filters import SmartFilter
from awx.main.utils.insights import filter_insights_api_response from awx.main.utils.insights import filter_insights_api_response
from awx.main.redact import UriCleaner
from awx.api.permissions import ( from awx.api.permissions import (
JobTemplateCallbackPermission, JobTemplateCallbackPermission,
TaskPermission, TaskPermission,
@ -4639,9 +4640,17 @@ class UnifiedJobList(ListAPIView):
serializer_class = UnifiedJobListSerializer serializer_class = UnifiedJobListSerializer
class StdoutANSIFilter(object): def redact_ansi(line):
# Remove ANSI escape sequences used to embed event data.
line = re.sub(r'\x1b\[K(?:[A-Za-z0-9+/=]+\x1b\[\d+D)+\x1b\[K', '', line)
# Remove ANSI color escape sequences.
return re.sub(r'\x1b[^m]*m', '', line)
class StdoutFilter(object):
def __init__(self, fileobj): def __init__(self, fileobj):
self._functions = []
self.fileobj = fileobj self.fileobj = fileobj
self.extra_data = '' self.extra_data = ''
if hasattr(fileobj, 'close'): if hasattr(fileobj, 'close'):
@ -4653,10 +4662,7 @@ class StdoutANSIFilter(object):
line = self.fileobj.readline(size) line = self.fileobj.readline(size)
if not line: if not line:
break break
# Remove ANSI escape sequences used to embed event data. line = self.process_line(line)
line = re.sub(r'\x1b\[K(?:[A-Za-z0-9+/=]+\x1b\[\d+D)+\x1b\[K', '', line)
# Remove ANSI color escape sequences.
line = re.sub(r'\x1b[^m]*m', '', line)
data += line data += line
if size > 0 and len(data) > size: if size > 0 and len(data) > size:
self.extra_data = data[size:] self.extra_data = data[size:]
@ -4665,6 +4671,14 @@ class StdoutANSIFilter(object):
self.extra_data = '' self.extra_data = ''
return data return data
def register(self, func):
self._functions.append(func)
def process_line(self, line):
for func in self._functions:
line = func(line)
return line
class UnifiedJobStdout(RetrieveAPIView): class UnifiedJobStdout(RetrieveAPIView):
@ -4722,9 +4736,12 @@ class UnifiedJobStdout(RetrieveAPIView):
suffix='.ansi' if target_format == 'ansi_download' else '' suffix='.ansi' if target_format == 'ansi_download' else ''
) )
content_fd = unified_job.result_stdout_raw_handle(enforce_max_bytes=False) content_fd = unified_job.result_stdout_raw_handle(enforce_max_bytes=False)
redactor = StdoutFilter(content_fd)
if target_format == 'txt_download': if target_format == 'txt_download':
content_fd = StdoutANSIFilter(content_fd) redactor.register(redact_ansi)
response = HttpResponse(FileWrapper(content_fd), content_type='text/plain') if type(unified_job) == ProjectUpdate:
redactor.register(UriCleaner.remove_sensitive)
response = HttpResponse(FileWrapper(redactor), content_type='text/plain')
response["Content-Disposition"] = 'attachment; filename="{}"'.format(filename) response["Content-Disposition"] = 'attachment; filename="{}"'.format(filename)
return response return response
else: else:

View File

@ -6,8 +6,7 @@ REPLACE_STR = '$encrypted$'
class UriCleaner(object): class UriCleaner(object):
REPLACE_STR = REPLACE_STR REPLACE_STR = REPLACE_STR
# https://regex101.com/r/sV2dO2/2 SENSITIVE_URI_PATTERN = re.compile(ur'(\w+:(\/?\/?)[^\s]+)', re.MULTILINE) # NOQA
SENSITIVE_URI_PATTERN = re.compile(ur'(?i)\b((?:https?://|www\d{0,3}[.]|[a-z0-9.\-]+[.][a-z]{2,4}/)(?:[^\s()<>]+|\(([^\s()<>]+|(\([^\s()<>]+\)))*\))+(?:\(([^\s()<>]+|(\([^\s()<>]+\)))*\)|[^\s`!()\[\]{};:\'".,<>?\xab\xbb\u201c\u201d\u2018\u2019]))', re.MULTILINE) # NOQA
@staticmethod @staticmethod
def remove_sensitive(cleartext): def remove_sensitive(cleartext):
@ -17,38 +16,46 @@ class UriCleaner(object):
match = UriCleaner.SENSITIVE_URI_PATTERN.search(redactedtext, text_index) match = UriCleaner.SENSITIVE_URI_PATTERN.search(redactedtext, text_index)
if not match: if not match:
break break
o = urlparse.urlsplit(match.group(1)) try:
if not o.username and not o.password: uri_str = match.group(1)
if o.netloc and ":" in o.netloc: # May raise a ValueError if invalid URI for one reason or another
# Handle the special case url http://username:password that can appear in SCM url o = urlparse.urlsplit(uri_str)
# on account of a bug? in ansible redaction
(username, password) = o.netloc.split(':') if not o.username and not o.password:
if o.netloc and ":" in o.netloc:
# Handle the special case url http://username:password that can appear in SCM url
# on account of a bug? in ansible redaction
(username, password) = o.netloc.split(':')
else:
text_index += len(match.group(1))
continue
else: else:
text_index += len(match.group(1)) username = o.username
continue password = o.password
else:
username = o.username
password = o.password
# Given a python MatchObject, with respect to redactedtext, find and # Given a python MatchObject, with respect to redactedtext, find and
# replace the first occurance of username and the first and second # replace the first occurance of username and the first and second
# occurance of password # occurance of password
uri_str = redactedtext[match.start():match.end()] uri_str = redactedtext[match.start():match.end()]
if username: if username:
uri_str = uri_str.replace(username, UriCleaner.REPLACE_STR, 1) uri_str = uri_str.replace(username, UriCleaner.REPLACE_STR, 1)
# 2, just in case the password is $encrypted$ # 2, just in case the password is $encrypted$
if password: if password:
uri_str = uri_str.replace(password, UriCleaner.REPLACE_STR, 2) uri_str = uri_str.replace(password, UriCleaner.REPLACE_STR, 2)
t = redactedtext[:match.start()] + uri_str t = redactedtext[:match.start()] + uri_str
text_index = len(t) text_index = len(t)
if (match.end() < len(redactedtext)): if (match.end() < len(redactedtext)):
t += redactedtext[match.end():] t += redactedtext[match.end():]
redactedtext = t redactedtext = t
if text_index >= len(redactedtext): if text_index >= len(redactedtext):
text_index = len(redactedtext) - 1 text_index = len(redactedtext) - 1
except ValueError:
# Invalid URI, redact the whole URI to be safe
redactedtext = redactedtext[:match.start()] + UriCleaner.REPLACE_STR + redactedtext[match.end():]
text_index = match.start() + len(UriCleaner.REPLACE_STR)
return redactedtext return redactedtext

View File

@ -1,4 +1,5 @@
import textwrap import textwrap
import pytest
# AWX # AWX
from awx.main.redact import UriCleaner from awx.main.redact import UriCleaner
@ -78,60 +79,76 @@ TEST_CLEARTEXT.append({
}) })
@pytest.mark.parametrize('username, password, not_uri, expected', [
('', '', 'www.famfamfam.com](http://www.famfamfam.com/fijdlfd', 'www.famfamfam.com](http://www.famfamfam.com/fijdlfd'),
('', '', 'https://www.famfamfam.com](http://www.famfamfam.com/fijdlfd', '$encrypted$'),
('root', 'gigity', 'https://root@gigity@www.famfamfam.com](http://www.famfamfam.com/fijdlfd', '$encrypted$'),
('root', 'gigity@', 'https://root:gigity@@@www.famfamfam.com](http://www.famfamfam.com/fijdlfd', '$encrypted$'),
])
# should redact sensitive usernames and passwords # should redact sensitive usernames and passwords
def test_uri_scm_simple_redacted(): def test_non_uri_redact(username, password, not_uri, expected):
for uri in TEST_URIS: redacted_str = UriCleaner.remove_sensitive(not_uri)
redacted_str = UriCleaner.remove_sensitive(str(uri)) if username:
if uri.username: assert username not in redacted_str
assert uri.username not in redacted_str if password:
if uri.password: assert password not in redacted_str
assert uri.username not in redacted_str
assert redacted_str == expected
def test_multiple_non_uri_redact():
non_uri = 'https://www.famfamfam.com](http://www.famfamfam.com/fijdlfd hi '
non_uri += 'https://www.famfamfam.com](http://www.famfamfam.com/fijdlfd world '
non_uri += 'https://www.famfamfam.com](http://www.famfamfam.com/fijdlfd foo '
non_uri += 'https://foo:bar@giggity.com bar'
redacted_str = UriCleaner.remove_sensitive(non_uri)
assert redacted_str == '$encrypted$ hi $encrypted$ world $encrypted$ foo https://$encrypted$:$encrypted$@giggity.com bar'
# should replace secret data with safe string, UriCleaner.REPLACE_STR # should replace secret data with safe string, UriCleaner.REPLACE_STR
def test_uri_scm_simple_replaced(): @pytest.mark.parametrize('uri', TEST_URIS)
for uri in TEST_URIS: def test_uri_scm_simple_replaced(uri):
redacted_str = UriCleaner.remove_sensitive(str(uri)) redacted_str = UriCleaner.remove_sensitive(str(uri))
assert redacted_str.count(UriCleaner.REPLACE_STR) == uri.get_secret_count() assert redacted_str.count(UriCleaner.REPLACE_STR) == uri.get_secret_count()
# should redact multiple uris in text # should redact multiple uris in text
def test_uri_scm_multiple(): @pytest.mark.parametrize('uri', TEST_URIS)
def test_uri_scm_multiple(uri):
cleartext = '' cleartext = ''
for uri in TEST_URIS: cleartext += str(uri) + ' '
cleartext += str(uri) + ' ' cleartext += str(uri) + '\n'
for uri in TEST_URIS:
cleartext += str(uri) + '\n'
redacted_str = UriCleaner.remove_sensitive(str(uri)) redacted_str = UriCleaner.remove_sensitive(str(uri))
if uri.username: if uri.username:
assert uri.username not in redacted_str assert uri.username not in redacted_str
if uri.password: if uri.password:
assert uri.username not in redacted_str assert uri.password not in redacted_str
# should replace multiple secret data with safe string # should replace multiple secret data with safe string
def test_uri_scm_multiple_replaced(): @pytest.mark.parametrize('uri', TEST_URIS)
def test_uri_scm_multiple_replaced(uri):
cleartext = '' cleartext = ''
find_count = 0 find_count = 0
for uri in TEST_URIS:
cleartext += str(uri) + ' '
find_count += uri.get_secret_count()
for uri in TEST_URIS: cleartext += str(uri) + ' '
cleartext += str(uri) + '\n' find_count += uri.get_secret_count()
find_count += uri.get_secret_count()
cleartext += str(uri) + '\n'
find_count += uri.get_secret_count()
redacted_str = UriCleaner.remove_sensitive(cleartext) redacted_str = UriCleaner.remove_sensitive(cleartext)
assert redacted_str.count(UriCleaner.REPLACE_STR) == find_count assert redacted_str.count(UriCleaner.REPLACE_STR) == find_count
# should redact and replace multiple secret data within a complex cleartext blob # should redact and replace multiple secret data within a complex cleartext blob
def test_uri_scm_cleartext_redact_and_replace(): @pytest.mark.parametrize('test_data', TEST_CLEARTEXT)
for test_data in TEST_CLEARTEXT: def test_uri_scm_cleartext_redact_and_replace(test_data):
uri = test_data['uri'] uri = test_data['uri']
redacted_str = UriCleaner.remove_sensitive(test_data['text']) redacted_str = UriCleaner.remove_sensitive(test_data['text'])
assert uri.username not in redacted_str assert uri.username not in redacted_str
assert uri.password not in redacted_str assert uri.password not in redacted_str
# Ensure the host didn't get redacted # Ensure the host didn't get redacted
assert redacted_str.count(uri.host) == test_data['host_occurrences'] assert redacted_str.count(uri.host) == test_data['host_occurrences']