mirror of
https://github.com/ansible/awx.git
synced 2024-10-31 06:51:10 +03:00
Merge pull request #5783 from jangsutsr/5718_enable_udp_communication_to_log_aggregators
Enable TCP/UDP communication to log aggregators
This commit is contained in:
commit
48934461c4
@ -320,9 +320,30 @@ register(
|
||||
'LOG_AGGREGATOR_TOWER_UUID',
|
||||
field_class=fields.CharField,
|
||||
allow_blank=True,
|
||||
default='',
|
||||
label=_('Cluster-wide Tower unique identifier.'),
|
||||
help_text=_('Useful to uniquely identify Tower instances.'),
|
||||
category=_('Logging'),
|
||||
category_slug='logging',
|
||||
default='',
|
||||
)
|
||||
register(
|
||||
'LOG_AGGREGATOR_PROTOCOL',
|
||||
field_class=fields.ChoiceField,
|
||||
choices=['https', 'tcp', 'udp'],
|
||||
default='https',
|
||||
label=_('Logging Aggregator Protocol'),
|
||||
help_text=_('Protocol used to communicate with log aggregator.'),
|
||||
category=_('Logging'),
|
||||
category_slug='logging',
|
||||
)
|
||||
register(
|
||||
'LOG_AGGREGATOR_TCP_TIMEOUT',
|
||||
field_class=fields.IntegerField,
|
||||
default=5,
|
||||
label=_('TCP Connection Timeout'),
|
||||
help_text=_('Number of seconds for a TCP connection to external log '
|
||||
'aggregator to timeout. Applies to HTTPS and TCP log '
|
||||
'aggregator protocols.'),
|
||||
category=_('Logging'),
|
||||
category_slug='logging',
|
||||
)
|
||||
|
@ -225,7 +225,9 @@ def test_logging_aggregrator_connection_test_valid(mocker, get, post, admin):
|
||||
('LOG_AGGREGATOR_LOGGERS', ['awx', 'activity_stream', 'job_events', 'system_tracking']),
|
||||
('LOG_AGGREGATOR_INDIVIDUAL_FACTS', False),
|
||||
('LOG_AGGREGATOR_ENABLED', False),
|
||||
('LOG_AGGREGATOR_TOWER_UUID', '')
|
||||
('LOG_AGGREGATOR_TOWER_UUID', ''),
|
||||
('LOG_AGGREGATOR_PROTOCOL', 'https'),
|
||||
('LOG_AGGREGATOR_TCP_TIMEOUT', 5),
|
||||
]))
|
||||
|
||||
|
||||
|
@ -1,16 +1,21 @@
|
||||
# -*- coding: utf-8 -*-
|
||||
import base64
|
||||
import cStringIO
|
||||
import json
|
||||
import logging
|
||||
import socket
|
||||
from uuid import uuid4
|
||||
|
||||
import mock
|
||||
|
||||
from django.conf import settings
|
||||
from django.conf import LazySettings
|
||||
import pytest
|
||||
import requests
|
||||
from requests_futures.sessions import FuturesSession
|
||||
|
||||
from awx.main.utils.handlers import (BaseHTTPSHandler as HTTPSHandler,
|
||||
from awx.main.utils.handlers import (BaseHandler, BaseHTTPSHandler as HTTPSHandler,
|
||||
TCPHandler, UDPHandler, _encode_payload_for_socket,
|
||||
PARAM_NAMES, LoggingConnectivityException)
|
||||
from awx.main.utils.formatters import LogstashFormatter
|
||||
|
||||
@ -57,6 +62,16 @@ def connection_error_adapter():
|
||||
return ConnectionErrorAdapter()
|
||||
|
||||
|
||||
@pytest.fixture
|
||||
def fake_socket(tmpdir_factory, request):
|
||||
sok = socket._socketobject
|
||||
sok.send = mock.MagicMock()
|
||||
sok.connect = mock.MagicMock()
|
||||
sok.setblocking = mock.MagicMock()
|
||||
sok.close = mock.MagicMock()
|
||||
return sok
|
||||
|
||||
|
||||
def test_https_logging_handler_requests_async_implementation():
|
||||
handler = HTTPSHandler()
|
||||
assert isinstance(handler.session, FuturesSession)
|
||||
@ -64,31 +79,121 @@ def test_https_logging_handler_requests_async_implementation():
|
||||
|
||||
def test_https_logging_handler_has_default_http_timeout():
|
||||
handler = HTTPSHandler.from_django_settings(settings)
|
||||
assert handler.http_timeout == 5
|
||||
assert handler.tcp_timeout == 5
|
||||
|
||||
|
||||
@pytest.mark.parametrize('param', PARAM_NAMES.keys())
|
||||
def test_https_logging_handler_defaults(param):
|
||||
handler = HTTPSHandler()
|
||||
def test_base_logging_handler_defaults(param):
|
||||
handler = BaseHandler()
|
||||
assert hasattr(handler, param) and getattr(handler, param) is None
|
||||
|
||||
|
||||
@pytest.mark.parametrize('param', PARAM_NAMES.keys())
|
||||
def test_https_logging_handler_kwargs(param):
|
||||
handler = HTTPSHandler(**{param: 'EXAMPLE'})
|
||||
def test_base_logging_handler_kwargs(param):
|
||||
handler = BaseHandler(**{param: 'EXAMPLE'})
|
||||
assert hasattr(handler, param) and getattr(handler, param) == 'EXAMPLE'
|
||||
|
||||
|
||||
@pytest.mark.parametrize('param, django_settings_name', PARAM_NAMES.items())
|
||||
def test_https_logging_handler_from_django_settings(param, django_settings_name):
|
||||
def test_base_logging_handler_from_django_settings(param, django_settings_name):
|
||||
settings = LazySettings()
|
||||
settings.configure(**{
|
||||
django_settings_name: 'EXAMPLE'
|
||||
})
|
||||
handler = HTTPSHandler.from_django_settings(settings)
|
||||
handler = BaseHandler.from_django_settings(settings)
|
||||
assert hasattr(handler, param) and getattr(handler, param) == 'EXAMPLE'
|
||||
|
||||
|
||||
@pytest.mark.parametrize('params, logger_name, expected', [
|
||||
# skip all records if enabled_flag = False
|
||||
({'enabled_flag': False}, 'awx.main', True),
|
||||
# skip all records if the host is undefined
|
||||
({'host': '', 'enabled_flag': True}, 'awx.main', True),
|
||||
# skip all records if underlying logger is used by handlers themselves
|
||||
({'host': '127.0.0.1', 'enabled_flag': True}, 'awx.main.utils.handlers', True),
|
||||
({'host': '127.0.0.1', 'enabled_flag': True}, 'awx.main', False),
|
||||
({'host': '127.0.0.1', 'enabled_flag': True, 'enabled_loggers': ['abc']}, 'awx.analytics.xyz', True),
|
||||
({'host': '127.0.0.1', 'enabled_flag': True, 'enabled_loggers': ['xyz']}, 'awx.analytics.xyz', False),
|
||||
])
|
||||
def test_base_logging_handler_skip_log(params, logger_name, expected):
|
||||
handler = BaseHandler(**params)
|
||||
assert handler._skip_log(logger_name) is expected
|
||||
|
||||
|
||||
def test_base_logging_handler_emit(dummy_log_record):
|
||||
handler = BaseHandler(host='127.0.0.1', enabled_flag=True,
|
||||
message_type='logstash',
|
||||
enabled_loggers=['awx', 'activity_stream', 'job_events', 'system_tracking'])
|
||||
handler.setFormatter(LogstashFormatter())
|
||||
sent_payloads = handler.emit(dummy_log_record)
|
||||
|
||||
assert len(sent_payloads) == 1
|
||||
body = json.loads(sent_payloads[0])
|
||||
|
||||
assert body['level'] == 'INFO'
|
||||
assert body['logger_name'] == 'awx'
|
||||
assert body['message'] == 'User joe logged in'
|
||||
|
||||
|
||||
def test_base_logging_handler_emit_one_record_per_fact():
|
||||
handler = BaseHandler(host='127.0.0.1', enabled_flag=True,
|
||||
message_type='logstash', indv_facts=True,
|
||||
enabled_loggers=['awx', 'activity_stream', 'job_events', 'system_tracking'])
|
||||
handler.setFormatter(LogstashFormatter())
|
||||
record = logging.LogRecord(
|
||||
'awx.analytics.system_tracking', # logger name
|
||||
20, # loglevel INFO
|
||||
'./awx/some/module.py', # pathname
|
||||
100, # lineno
|
||||
None, # msg
|
||||
tuple(), # args,
|
||||
None # exc_info
|
||||
)
|
||||
record.module_name = 'packages'
|
||||
record.facts_data = [{
|
||||
"name": "ansible",
|
||||
"version": "2.2.1.0"
|
||||
}, {
|
||||
"name": "ansible-tower",
|
||||
"version": "3.1.0"
|
||||
}]
|
||||
sent_payloads = handler.emit(record)
|
||||
|
||||
assert len(sent_payloads) == 2
|
||||
sent_payloads.sort(key=lambda payload: payload['version'])
|
||||
|
||||
assert sent_payloads[0]['level'] == 'INFO'
|
||||
assert sent_payloads[0]['logger_name'] == 'awx.analytics.system_tracking'
|
||||
assert sent_payloads[0]['name'] == 'ansible'
|
||||
assert sent_payloads[0]['version'] == '2.2.1.0'
|
||||
assert sent_payloads[1]['level'] == 'INFO'
|
||||
assert sent_payloads[1]['logger_name'] == 'awx.analytics.system_tracking'
|
||||
assert sent_payloads[1]['name'] == 'ansible-tower'
|
||||
assert sent_payloads[1]['version'] == '3.1.0'
|
||||
|
||||
|
||||
@pytest.mark.parametrize('host, port, normalized, hostname_only', [
|
||||
('localhost', None, 'http://localhost', False),
|
||||
('localhost', 8080, 'http://localhost:8080', False),
|
||||
('http://localhost', None, 'http://localhost', False),
|
||||
('http://localhost', 8080, 'http://localhost:8080', False),
|
||||
('https://localhost', 443, 'https://localhost:443', False),
|
||||
('ftp://localhost', 443, 'ftp://localhost:443', False),
|
||||
('https://localhost:550', 443, 'https://localhost:550', False),
|
||||
('https://localhost:yoho/foobar', 443, 'https://localhost:443/foobar', False),
|
||||
('https://localhost:yoho/foobar', None, 'https://localhost:yoho/foobar', False),
|
||||
('http://splunk.server:8088/services/collector/event', 80,
|
||||
'http://splunk.server:8088/services/collector/event', False),
|
||||
('http://splunk.server/services/collector/event', 8088,
|
||||
'http://splunk.server:8088/services/collector/event', False),
|
||||
('localhost', 4399, 'localhost', True),
|
||||
('tcp://localhost:4399/foo/bar', 4399, 'localhost', True),
|
||||
])
|
||||
def test_base_logging_handler_host_format(host, port, normalized, hostname_only):
|
||||
handler = BaseHandler(host=host, port=port)
|
||||
assert handler._get_host(scheme='http', hostname_only=hostname_only) == normalized
|
||||
|
||||
|
||||
@pytest.mark.parametrize(
|
||||
'status, reason, exc',
|
||||
[(200, '200 OK', None), (404, 'Not Found', LoggingConnectivityException)]
|
||||
@ -127,7 +232,7 @@ def test_https_logging_handler_connectivity_test(http_adapter, status, reason, e
|
||||
|
||||
def test_https_logging_handler_logstash_auth_info():
|
||||
handler = HTTPSHandler(message_type='logstash', username='bob', password='ansible')
|
||||
handler.add_auth_information()
|
||||
handler._add_auth_information()
|
||||
assert isinstance(handler.session.auth, requests.auth.HTTPBasicAuth)
|
||||
assert handler.session.auth.username == 'bob'
|
||||
assert handler.session.auth.password == 'ansible'
|
||||
@ -135,47 +240,11 @@ def test_https_logging_handler_logstash_auth_info():
|
||||
|
||||
def test_https_logging_handler_splunk_auth_info():
|
||||
handler = HTTPSHandler(message_type='splunk', password='ansible')
|
||||
handler.add_auth_information()
|
||||
handler._add_auth_information()
|
||||
assert handler.session.headers['Authorization'] == 'Splunk ansible'
|
||||
assert handler.session.headers['Content-Type'] == 'application/json'
|
||||
|
||||
|
||||
@pytest.mark.parametrize('host, port, normalized', [
|
||||
('localhost', None, 'http://localhost'),
|
||||
('localhost', 80, 'http://localhost'),
|
||||
('localhost', 8080, 'http://localhost:8080'),
|
||||
('http://localhost', None, 'http://localhost'),
|
||||
('http://localhost', 80, 'http://localhost'),
|
||||
('http://localhost', 8080, 'http://localhost:8080'),
|
||||
('https://localhost', 443, 'https://localhost:443'),
|
||||
('ftp://localhost', 443, 'ftp://localhost:443'),
|
||||
('https://localhost:550', 443, 'https://localhost:550'),
|
||||
('https://localhost:yoho/foobar', 443, 'https://localhost:443/foobar'),
|
||||
('https://localhost:yoho/foobar', None, 'https://localhost:yoho/foobar'),
|
||||
('http://splunk.server:8088/services/collector/event', 80,
|
||||
'http://splunk.server:8088/services/collector/event'),
|
||||
('http://splunk.server/services/collector/event', 80,
|
||||
'http://splunk.server/services/collector/event'),
|
||||
('http://splunk.server/services/collector/event', 8088,
|
||||
'http://splunk.server:8088/services/collector/event'),
|
||||
])
|
||||
def test_https_logging_handler_http_host_format(host, port, normalized):
|
||||
handler = HTTPSHandler(host=host, port=port)
|
||||
assert handler.get_http_host() == normalized
|
||||
|
||||
|
||||
@pytest.mark.parametrize('params, logger_name, expected', [
|
||||
({'enabled_flag': False}, 'awx.main', True), # skip all records if enabled_flag = False
|
||||
({'host': '', 'enabled_flag': True}, 'awx.main', True), # skip all records if the host is undefined
|
||||
({'host': '127.0.0.1', 'enabled_flag': True}, 'awx.main', False),
|
||||
({'host': '127.0.0.1', 'enabled_flag': True, 'enabled_loggers': ['abc']}, 'awx.analytics.xyz', True),
|
||||
({'host': '127.0.0.1', 'enabled_flag': True, 'enabled_loggers': ['xyz']}, 'awx.analytics.xyz', False),
|
||||
])
|
||||
def test_https_logging_handler_skip_log(params, logger_name, expected):
|
||||
handler = HTTPSHandler(**params)
|
||||
assert handler.skip_log(logger_name) is expected
|
||||
|
||||
|
||||
def test_https_logging_handler_connection_error(connection_error_adapter,
|
||||
dummy_log_record):
|
||||
handler = HTTPSHandler(host='127.0.0.1', enabled_flag=True,
|
||||
@ -204,8 +273,8 @@ def test_https_logging_handler_connection_error(connection_error_adapter,
|
||||
|
||||
|
||||
@pytest.mark.parametrize('message_type', ['logstash', 'splunk'])
|
||||
def test_https_logging_handler_emit(http_adapter, dummy_log_record,
|
||||
message_type):
|
||||
def test_https_logging_handler_emit_without_cred(http_adapter, dummy_log_record,
|
||||
message_type):
|
||||
handler = HTTPSHandler(host='127.0.0.1', enabled_flag=True,
|
||||
message_type=message_type,
|
||||
enabled_loggers=['awx', 'activity_stream', 'job_events', 'system_tracking'])
|
||||
@ -218,21 +287,14 @@ def test_https_logging_handler_emit(http_adapter, dummy_log_record,
|
||||
request = http_adapter.requests[0]
|
||||
assert request.url == 'http://127.0.0.1/'
|
||||
assert request.method == 'POST'
|
||||
body = json.loads(request.body)
|
||||
|
||||
if message_type == 'logstash':
|
||||
# A username + password weren't used, so this header should be missing
|
||||
assert 'Authorization' not in request.headers
|
||||
|
||||
if message_type == 'splunk':
|
||||
# splunk messages are nested under the 'event' key
|
||||
body = body['event']
|
||||
assert request.headers['Authorization'] == 'Splunk None'
|
||||
|
||||
assert body['level'] == 'INFO'
|
||||
assert body['logger_name'] == 'awx'
|
||||
assert body['message'] == 'User joe logged in'
|
||||
|
||||
|
||||
def test_https_logging_handler_emit_logstash_with_creds(http_adapter,
|
||||
dummy_log_record):
|
||||
@ -265,49 +327,78 @@ def test_https_logging_handler_emit_splunk_with_creds(http_adapter,
|
||||
assert request.headers['Authorization'] == 'Splunk pass'
|
||||
|
||||
|
||||
def test_https_logging_handler_emit_one_record_per_fact(http_adapter):
|
||||
handler = HTTPSHandler(host='127.0.0.1', enabled_flag=True,
|
||||
message_type='logstash', indv_facts=True,
|
||||
enabled_loggers=['awx', 'activity_stream', 'job_events', 'system_tracking'])
|
||||
@pytest.mark.parametrize('payload, encoded_payload', [
|
||||
('foobar', 'foobar'),
|
||||
({'foo': 'bar'}, '{"foo": "bar"}'),
|
||||
({u'测试键': u'测试值'}, '{"测试键": "测试值"}'),
|
||||
])
|
||||
def test_encode_payload_for_socket(payload, encoded_payload):
|
||||
assert _encode_payload_for_socket(payload) == encoded_payload
|
||||
|
||||
|
||||
def test_udp_handler_create_socket_at_init():
|
||||
handler = UDPHandler(host='127.0.0.1', port=4399,
|
||||
enabled_flag=True, message_type='splunk',
|
||||
enabled_loggers=['awx', 'activity_stream', 'job_events', 'system_tracking'])
|
||||
assert hasattr(handler, 'socket')
|
||||
assert isinstance(handler.socket, socket.socket)
|
||||
assert handler.socket.family == socket.AF_INET
|
||||
assert handler.socket.type == socket.SOCK_DGRAM
|
||||
|
||||
|
||||
def test_udp_handler_send(dummy_log_record):
|
||||
handler = UDPHandler(host='127.0.0.1', port=4399,
|
||||
enabled_flag=True, message_type='splunk',
|
||||
enabled_loggers=['awx', 'activity_stream', 'job_events', 'system_tracking'])
|
||||
handler.setFormatter(LogstashFormatter())
|
||||
handler.session.mount('http://', http_adapter)
|
||||
record = logging.LogRecord(
|
||||
'awx.analytics.system_tracking', # logger name
|
||||
20, # loglevel INFO
|
||||
'./awx/some/module.py', # pathname
|
||||
100, # lineno
|
||||
None, # msg
|
||||
tuple(), # args,
|
||||
None # exc_info
|
||||
)
|
||||
record.module_name = 'packages'
|
||||
record.facts_data = [{
|
||||
"name": "ansible",
|
||||
"version": "2.2.1.0"
|
||||
}, {
|
||||
"name": "ansible-tower",
|
||||
"version": "3.1.0"
|
||||
}]
|
||||
async_futures = handler.emit(record)
|
||||
[future.result() for future in async_futures]
|
||||
with mock.patch('awx.main.utils.handlers._encode_payload_for_socket', return_value="des") as encode_mock,\
|
||||
mock.patch.object(handler, 'socket') as socket_mock:
|
||||
handler.emit(dummy_log_record)
|
||||
encode_mock.assert_called_once_with(handler.format(dummy_log_record))
|
||||
socket_mock.sendto.assert_called_once_with("des", ('127.0.0.1', 4399))
|
||||
|
||||
assert len(http_adapter.requests) == 2
|
||||
requests = sorted(http_adapter.requests, key=lambda request: json.loads(request.body)['version'])
|
||||
|
||||
request = requests[0]
|
||||
assert request.url == 'http://127.0.0.1/'
|
||||
assert request.method == 'POST'
|
||||
body = json.loads(request.body)
|
||||
assert body['level'] == 'INFO'
|
||||
assert body['logger_name'] == 'awx.analytics.system_tracking'
|
||||
assert body['name'] == 'ansible'
|
||||
assert body['version'] == '2.2.1.0'
|
||||
def test_tcp_handler_send(fake_socket, dummy_log_record):
|
||||
handler = TCPHandler(host='127.0.0.1', port=4399, tcp_timeout=5,
|
||||
enabled_flag=True, message_type='splunk',
|
||||
enabled_loggers=['awx', 'activity_stream', 'job_events', 'system_tracking'])
|
||||
handler.setFormatter(LogstashFormatter())
|
||||
with mock.patch('socket.socket', return_value=fake_socket) as sok_init_mock,\
|
||||
mock.patch('select.select', return_value=([], [fake_socket], [])):
|
||||
handler.emit(dummy_log_record)
|
||||
sok_init_mock.assert_called_once_with(socket.AF_INET, socket.SOCK_STREAM)
|
||||
fake_socket.connect.assert_called_once_with(('127.0.0.1', 4399))
|
||||
fake_socket.setblocking.assert_called_once_with(0)
|
||||
fake_socket.send.assert_called_once_with(handler.format(dummy_log_record))
|
||||
fake_socket.close.assert_called_once()
|
||||
|
||||
request = requests[1]
|
||||
assert request.url == 'http://127.0.0.1/'
|
||||
assert request.method == 'POST'
|
||||
body = json.loads(request.body)
|
||||
assert body['level'] == 'INFO'
|
||||
assert body['logger_name'] == 'awx.analytics.system_tracking'
|
||||
assert body['name'] == 'ansible-tower'
|
||||
assert body['version'] == '3.1.0'
|
||||
|
||||
def test_tcp_handler_return_if_socket_unavailable(fake_socket, dummy_log_record):
|
||||
handler = TCPHandler(host='127.0.0.1', port=4399, tcp_timeout=5,
|
||||
enabled_flag=True, message_type='splunk',
|
||||
enabled_loggers=['awx', 'activity_stream', 'job_events', 'system_tracking'])
|
||||
handler.setFormatter(LogstashFormatter())
|
||||
with mock.patch('socket.socket', return_value=fake_socket) as sok_init_mock,\
|
||||
mock.patch('select.select', return_value=([], [], [])):
|
||||
handler.emit(dummy_log_record)
|
||||
sok_init_mock.assert_called_once_with(socket.AF_INET, socket.SOCK_STREAM)
|
||||
fake_socket.connect.assert_called_once_with(('127.0.0.1', 4399))
|
||||
fake_socket.setblocking.assert_called_once_with(0)
|
||||
assert not fake_socket.send.called
|
||||
fake_socket.close.assert_called_once()
|
||||
|
||||
|
||||
def test_tcp_handler_log_exception(fake_socket, dummy_log_record):
|
||||
handler = TCPHandler(host='127.0.0.1', port=4399, tcp_timeout=5,
|
||||
enabled_flag=True, message_type='splunk',
|
||||
enabled_loggers=['awx', 'activity_stream', 'job_events', 'system_tracking'])
|
||||
handler.setFormatter(LogstashFormatter())
|
||||
with mock.patch('socket.socket', return_value=fake_socket) as sok_init_mock,\
|
||||
mock.patch('select.select', return_value=([], [], [])),\
|
||||
mock.patch('awx.main.utils.handlers.logger') as logger_mock:
|
||||
fake_socket.connect.side_effect = Exception("foo")
|
||||
handler.emit(dummy_log_record)
|
||||
sok_init_mock.assert_called_once_with(socket.AF_INET, socket.SOCK_STREAM)
|
||||
logger_mock.exception.assert_called_once()
|
||||
fake_socket.close.assert_called_once()
|
||||
assert not fake_socket.send.called
|
||||
|
@ -7,6 +7,9 @@ import json
|
||||
import requests
|
||||
import time
|
||||
import urlparse
|
||||
import socket
|
||||
import select
|
||||
import six
|
||||
from concurrent.futures import ThreadPoolExecutor
|
||||
from copy import copy
|
||||
from requests.exceptions import RequestException
|
||||
@ -20,7 +23,8 @@ from requests_futures.sessions import FuturesSession
|
||||
from awx.main.utils.formatters import LogstashFormatter
|
||||
|
||||
|
||||
__all__ = ['HTTPSNullHandler', 'BaseHTTPSHandler', 'configure_external_logger']
|
||||
__all__ = ['HTTPSNullHandler', 'BaseHTTPSHandler', 'TCPHandler', 'UDPHandler',
|
||||
'configure_external_logger']
|
||||
|
||||
logger = logging.getLogger('awx.main.utils.handlers')
|
||||
|
||||
@ -39,7 +43,7 @@ PARAM_NAMES = {
|
||||
'enabled_loggers': 'LOG_AGGREGATOR_LOGGERS',
|
||||
'indv_facts': 'LOG_AGGREGATOR_INDIVIDUAL_FACTS',
|
||||
'enabled_flag': 'LOG_AGGREGATOR_ENABLED',
|
||||
'http_timeout': 'LOG_AGGREGATOR_HTTP_TIMEOUT',
|
||||
'tcp_timeout': 'LOG_AGGREGATOR_TCP_TIMEOUT',
|
||||
}
|
||||
|
||||
|
||||
@ -84,16 +88,11 @@ class VerboseThreadPoolExecutor(ThreadPoolExecutor):
|
||||
**kwargs)
|
||||
|
||||
|
||||
class BaseHTTPSHandler(logging.Handler):
|
||||
def __init__(self, fqdn=False, **kwargs):
|
||||
super(BaseHTTPSHandler, self).__init__()
|
||||
self.fqdn = fqdn
|
||||
class BaseHandler(logging.Handler):
|
||||
def __init__(self, **kwargs):
|
||||
super(BaseHandler, self).__init__()
|
||||
for fd in PARAM_NAMES:
|
||||
setattr(self, fd, kwargs.get(fd, None))
|
||||
self.session = FuturesSession(executor=VerboseThreadPoolExecutor(
|
||||
max_workers=2 # this is the default used by requests_futures
|
||||
))
|
||||
self.add_auth_information()
|
||||
|
||||
@classmethod
|
||||
def from_django_settings(cls, settings, *args, **kwargs):
|
||||
@ -101,6 +100,113 @@ class BaseHTTPSHandler(logging.Handler):
|
||||
kwargs[param] = getattr(settings, django_setting_name, None)
|
||||
return cls(*args, **kwargs)
|
||||
|
||||
def get_full_message(self, record):
|
||||
if record.exc_info:
|
||||
return '\n'.join(traceback.format_exception(*record.exc_info))
|
||||
else:
|
||||
return record.getMessage()
|
||||
|
||||
def _send(self, payload):
|
||||
"""Actually send message to log aggregator.
|
||||
"""
|
||||
return payload
|
||||
|
||||
def _send_and_queue_system_tracking(self, payload_data):
|
||||
# Special action for System Tracking, queue up multiple log messages
|
||||
ret = []
|
||||
module_name = payload_data['module_name']
|
||||
if module_name in ['services', 'packages', 'files']:
|
||||
facts_dict = payload_data.pop(module_name)
|
||||
for key in facts_dict:
|
||||
fact_payload = copy(payload_data)
|
||||
fact_payload.update(facts_dict[key])
|
||||
ret.append(self._send(fact_payload))
|
||||
return ret
|
||||
|
||||
def _format_and_send_record(self, record):
|
||||
ret = []
|
||||
payload = self.format(record)
|
||||
if self.indv_facts:
|
||||
payload_data = json.loads(payload)
|
||||
if record.name.startswith('awx.analytics.system_tracking'):
|
||||
ret.extend(self._send_and_queue_system_tracking(payload_data))
|
||||
if len(ret) == 0:
|
||||
ret.append(self._send(payload))
|
||||
return ret
|
||||
|
||||
def _skip_log(self, logger_name):
|
||||
if self.host == '' or (not self.enabled_flag):
|
||||
return True
|
||||
# Don't send handler-related records.
|
||||
if logger_name == logger.name:
|
||||
return True
|
||||
# Tower log emission is only turned off by enablement setting
|
||||
if not logger_name.startswith('awx.analytics'):
|
||||
return False
|
||||
return self.enabled_loggers is None or logger_name[len('awx.analytics.'):] not in self.enabled_loggers
|
||||
|
||||
def emit(self, record):
|
||||
"""
|
||||
Emit a log record. Returns a list of zero or more
|
||||
implementation-specific objects for tests.
|
||||
"""
|
||||
if self._skip_log(record.name):
|
||||
return []
|
||||
try:
|
||||
return self._format_and_send_record(record)
|
||||
except (KeyboardInterrupt, SystemExit):
|
||||
raise
|
||||
except:
|
||||
self.handleError(record)
|
||||
|
||||
def _get_host(self, scheme='', hostname_only=False):
|
||||
"""Return the host name of log aggregator.
|
||||
"""
|
||||
host = self.host or ''
|
||||
# urlparse requires '//' to be provided if scheme is not specified
|
||||
if not urlparse.urlsplit(host).scheme and not host.startswith('//'):
|
||||
host = '%s://%s' % (scheme, host) if scheme else '//%s' % host
|
||||
parsed = urlparse.urlsplit(host)
|
||||
|
||||
if hostname_only:
|
||||
return parsed.hostname
|
||||
|
||||
try:
|
||||
port = parsed.port or self.port
|
||||
except ValueError:
|
||||
port = self.port
|
||||
netloc = parsed.netloc if port is None else '%s:%s' % (parsed.hostname, port)
|
||||
|
||||
url_components = list(parsed)
|
||||
url_components[1] = netloc
|
||||
ret = urlparse.urlunsplit(url_components)
|
||||
return ret.lstrip('/')
|
||||
|
||||
|
||||
class BaseHTTPSHandler(BaseHandler):
|
||||
def _add_auth_information(self):
|
||||
if self.message_type == 'logstash':
|
||||
if not self.username:
|
||||
# Logstash authentication not enabled
|
||||
return
|
||||
logstash_auth = requests.auth.HTTPBasicAuth(self.username, self.password)
|
||||
self.session.auth = logstash_auth
|
||||
elif self.message_type == 'splunk':
|
||||
auth_header = "Splunk %s" % self.password
|
||||
headers = {
|
||||
"Authorization": auth_header,
|
||||
"Content-Type": "application/json"
|
||||
}
|
||||
self.session.headers.update(headers)
|
||||
|
||||
def __init__(self, fqdn=False, **kwargs):
|
||||
self.fqdn = fqdn
|
||||
super(BaseHTTPSHandler, self).__init__(**kwargs)
|
||||
self.session = FuturesSession(executor=VerboseThreadPoolExecutor(
|
||||
max_workers=2 # this is the default used by requests_futures
|
||||
))
|
||||
self._add_auth_information()
|
||||
|
||||
@classmethod
|
||||
def perform_test(cls, settings):
|
||||
"""
|
||||
@ -126,47 +232,7 @@ class BaseHTTPSHandler(logging.Handler):
|
||||
except RequestException as e:
|
||||
raise LoggingConnectivityException(str(e))
|
||||
|
||||
def get_full_message(self, record):
|
||||
if record.exc_info:
|
||||
return '\n'.join(traceback.format_exception(*record.exc_info))
|
||||
else:
|
||||
return record.getMessage()
|
||||
|
||||
def add_auth_information(self):
|
||||
if self.message_type == 'logstash':
|
||||
if not self.username:
|
||||
# Logstash authentication not enabled
|
||||
return
|
||||
logstash_auth = requests.auth.HTTPBasicAuth(self.username, self.password)
|
||||
self.session.auth = logstash_auth
|
||||
elif self.message_type == 'splunk':
|
||||
auth_header = "Splunk %s" % self.password
|
||||
headers = {
|
||||
"Authorization": auth_header,
|
||||
"Content-Type": "application/json"
|
||||
}
|
||||
self.session.headers.update(headers)
|
||||
|
||||
def get_http_host(self):
|
||||
host = self.host or ''
|
||||
# urlparse requires scheme to be provided, default to use http if
|
||||
# missing
|
||||
if not urlparse.urlsplit(host).scheme:
|
||||
host = 'http://%s' % host
|
||||
parsed = urlparse.urlsplit(host)
|
||||
# Insert self.port if its special and port number is either not
|
||||
# given in host or given as non-numerical
|
||||
try:
|
||||
port = parsed.port or self.port
|
||||
except ValueError:
|
||||
port = self.port
|
||||
if port not in (80, None):
|
||||
new_netloc = '%s:%s' % (parsed.hostname, port)
|
||||
return urlparse.urlunsplit((parsed.scheme, new_netloc, parsed.path,
|
||||
parsed.query, parsed.fragment))
|
||||
return host
|
||||
|
||||
def get_post_kwargs(self, payload_input):
|
||||
def _get_post_kwargs(self, payload_input):
|
||||
if self.message_type == 'splunk':
|
||||
# Splunk needs data nested under key "event"
|
||||
if not isinstance(payload_input, dict):
|
||||
@ -177,56 +243,64 @@ class BaseHTTPSHandler(logging.Handler):
|
||||
else:
|
||||
payload_str = payload_input
|
||||
return dict(data=payload_str, background_callback=unused_callback,
|
||||
timeout=self.http_timeout)
|
||||
timeout=self.tcp_timeout)
|
||||
|
||||
def skip_log(self, logger_name):
|
||||
if self.host == '' or (not self.enabled_flag):
|
||||
return True
|
||||
if not logger_name.startswith('awx.analytics'):
|
||||
# Tower log emission is only turned off by enablement setting
|
||||
return False
|
||||
return self.enabled_loggers is None or logger_name[len('awx.analytics.'):] not in self.enabled_loggers
|
||||
|
||||
def emit(self, record):
|
||||
"""
|
||||
Emit a log record. Returns a list of zero or more
|
||||
``concurrent.futures.Future`` objects.
|
||||
|
||||
See:
|
||||
def _send(self, payload):
|
||||
"""See:
|
||||
https://docs.python.org/3/library/concurrent.futures.html#future-objects
|
||||
http://pythonhosted.org/futures/
|
||||
"""
|
||||
if self.skip_log(record.name):
|
||||
return []
|
||||
return self.session.post(self._get_host(scheme='http'),
|
||||
**self._get_post_kwargs(payload))
|
||||
|
||||
|
||||
def _encode_payload_for_socket(payload):
|
||||
encoded_payload = payload
|
||||
if isinstance(encoded_payload, dict):
|
||||
encoded_payload = json.dumps(encoded_payload, ensure_ascii=False)
|
||||
if isinstance(encoded_payload, six.text_type):
|
||||
encoded_payload = encoded_payload.encode('utf-8')
|
||||
return encoded_payload
|
||||
|
||||
|
||||
class TCPHandler(BaseHandler):
|
||||
def _send(self, payload):
|
||||
payload = _encode_payload_for_socket(payload)
|
||||
sok = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
|
||||
try:
|
||||
payload = self.format(record)
|
||||
sok.connect((self._get_host(hostname_only=True), self.port or 0))
|
||||
sok.setblocking(0)
|
||||
_, ready_to_send, _ = select.select([], [sok], [], float(self.tcp_timeout))
|
||||
if len(ready_to_send) == 0:
|
||||
logger.warning("Socket currently busy, failed to send message")
|
||||
sok.close()
|
||||
return
|
||||
sok.send(payload)
|
||||
except Exception as e:
|
||||
logger.exception("Error sending message from %s: %s" %
|
||||
(TCPHandler.__name__, e.message))
|
||||
sok.close()
|
||||
|
||||
# Special action for System Tracking, queue up multiple log messages
|
||||
if self.indv_facts:
|
||||
payload_data = json.loads(payload)
|
||||
if record.name.startswith('awx.analytics.system_tracking'):
|
||||
module_name = payload_data['module_name']
|
||||
if module_name in ['services', 'packages', 'files']:
|
||||
facts_dict = payload_data.pop(module_name)
|
||||
async_futures = []
|
||||
for key in facts_dict:
|
||||
fact_payload = copy(payload_data)
|
||||
fact_payload.update(facts_dict[key])
|
||||
async_futures.append(self._send(fact_payload))
|
||||
return async_futures
|
||||
|
||||
return [self._send(payload)]
|
||||
except (KeyboardInterrupt, SystemExit):
|
||||
raise
|
||||
except:
|
||||
self.handleError(record)
|
||||
class UDPHandler(BaseHandler):
|
||||
def __init__(self, **kwargs):
|
||||
super(UDPHandler, self).__init__(**kwargs)
|
||||
self.socket = socket.socket(socket.AF_INET, socket.SOCK_DGRAM)
|
||||
|
||||
def _send(self, payload):
|
||||
return self.session.post(self.get_http_host(),
|
||||
**self.get_post_kwargs(payload))
|
||||
payload = _encode_payload_for_socket(payload)
|
||||
return self.socket.sendto(payload, (self._get_host(hostname_only=True), self.port or 0))
|
||||
|
||||
|
||||
def add_or_remove_logger(address, instance):
|
||||
HANDLER_MAPPING = {
|
||||
'https': BaseHTTPSHandler,
|
||||
'tcp': TCPHandler,
|
||||
'udp': UDPHandler,
|
||||
}
|
||||
|
||||
|
||||
def _add_or_remove_logger(address, instance):
|
||||
specific_logger = logging.getLogger(address)
|
||||
for i, handler in enumerate(specific_logger.handlers):
|
||||
if isinstance(handler, (HTTPSNullHandler, BaseHTTPSHandler)):
|
||||
@ -238,7 +312,6 @@ def add_or_remove_logger(address, instance):
|
||||
|
||||
|
||||
def configure_external_logger(settings_module, is_startup=True):
|
||||
|
||||
is_enabled = settings_module.LOG_AGGREGATOR_ENABLED
|
||||
if is_startup and (not is_enabled):
|
||||
# Pass-through if external logging not being used
|
||||
@ -246,11 +319,12 @@ def configure_external_logger(settings_module, is_startup=True):
|
||||
|
||||
instance = None
|
||||
if is_enabled:
|
||||
instance = BaseHTTPSHandler.from_django_settings(settings_module)
|
||||
handler_class = HANDLER_MAPPING[settings_module.LOG_AGGREGATOR_PROTOCOL]
|
||||
instance = handler_class.from_django_settings(settings_module)
|
||||
instance.setFormatter(LogstashFormatter(settings_module=settings_module))
|
||||
awx_logger_instance = instance
|
||||
if is_enabled and 'awx' not in settings_module.LOG_AGGREGATOR_LOGGERS:
|
||||
awx_logger_instance = None
|
||||
|
||||
add_or_remove_logger('awx.analytics', instance)
|
||||
add_or_remove_logger('awx', awx_logger_instance)
|
||||
_add_or_remove_logger('awx.analytics', instance)
|
||||
_add_or_remove_logger('awx', awx_logger_instance)
|
||||
|
@ -877,7 +877,7 @@ INSIGHTS_URL_BASE = "https://access.redhat.com"
|
||||
TOWER_SETTINGS_MANIFEST = {}
|
||||
|
||||
LOG_AGGREGATOR_ENABLED = False
|
||||
LOG_AGGREGATOR_HTTP_TIMEOUT = 5
|
||||
LOG_AGGREGATOR_TCP_TIMEOUT = 5
|
||||
|
||||
# The number of retry attempts for websocket session establishment
|
||||
# If you're encountering issues establishing websockets in clustered Tower,
|
||||
|
@ -4,7 +4,8 @@ This feature builds in the capability to send detailed logs to several kinds
|
||||
of 3rd party external log aggregation services. Services connected to this
|
||||
data feed should be useful in order to gain insights into Tower usage
|
||||
or technical trends. The data is intended to be
|
||||
sent in JSON format over a HTTP connection using minimal service-specific
|
||||
sent in JSON format via three ways: over a HTTP connection, a direct TCP
|
||||
connection or a direct UDP connection. It uses minimal service-specific
|
||||
tweaks engineered in a custom handler or via an imported library.
|
||||
|
||||
## Loggers
|
||||
@ -169,14 +170,24 @@ supported services:
|
||||
- A flag to indicate how system tracking records will be sent
|
||||
- Selecting which loggers to send
|
||||
- Enabling sending logs
|
||||
- Connection type, which can be HTTPS, TCP and UDP.
|
||||
- Timeout value if connection type is based on TCP protocol (HTTPS and TCP).
|
||||
|
||||
Some settings for the log handler will not be exposed to the user via
|
||||
this mechanism. In particular, threading (enabled), and connection type
|
||||
(designed for HTTP/HTTPS).
|
||||
this mechanism. For example, threading (enabled).
|
||||
|
||||
Parameters for the items listed above should be configurable through
|
||||
the Configure-Tower-in-Tower interface.
|
||||
|
||||
One note on configuring Host and Port: When entering URL it is customary to
|
||||
include port number, like `https://localhost:4399/foo/bar`. So for the convenience
|
||||
of users, when connection type is HTTPS, we allow entering hostname as a URL
|
||||
with port number and thus ignore Port field. In other words, Port field is
|
||||
optional in this case. On the other hand, TCP and UDP connections are determined
|
||||
by `<hostname, port number>` tuple rather than URL. So in the case of TCP/UDP
|
||||
connection, Port field is supposed to be provided and Host field is supposed to
|
||||
contain hostname only. If instead a URL is entered in Host field, its hostname
|
||||
portion will be extracted as the actual hostname.
|
||||
|
||||
# Acceptance Criteria Notes
|
||||
|
||||
@ -204,4 +215,3 @@ request-response cycle. For example, loggly examples use
|
||||
threading work to fire the message without interfering with other
|
||||
operations. A timeout on the part of the log aggregation service should
|
||||
not cause Tower operations to hang.
|
||||
|
||||
|
@ -4,6 +4,12 @@ input {
|
||||
user => awx_logger
|
||||
password => "workflows"
|
||||
}
|
||||
udp {
|
||||
port => 8086
|
||||
}
|
||||
tcp {
|
||||
port => 8087
|
||||
}
|
||||
}
|
||||
|
||||
## Add your filters / logstash plugins configuration here
|
||||
|
@ -35,8 +35,8 @@ by going to `{server address}:5601`.
|
||||
|
||||
### Authentication
|
||||
|
||||
The default logstash configuration makes use of basic auth, so a username
|
||||
and password is needed in the configuration, in addition to the other
|
||||
The default HTTPS logstash configuration makes use of basic auth, so a username
|
||||
and password is needed in HTTPS configuration, in addition to the other
|
||||
parameters. The following settings are supported:
|
||||
|
||||
```
|
||||
@ -53,10 +53,47 @@ parameters. The following settings are supported:
|
||||
"system_tracking"
|
||||
],
|
||||
"LOG_AGGREGATOR_INDIVIDUAL_FACTS": false,
|
||||
"LOG_AGGREGATOR_ENABLED": true
|
||||
"LOG_AGGREGATOR_ENABLED": true,
|
||||
"LOG_AGGREGATOR_PROTOCOL": "https",
|
||||
"LOG_AGGREGATOR_TCP_TIMEOUT": 5
|
||||
}
|
||||
```
|
||||
and
|
||||
```
|
||||
{
|
||||
"LOG_AGGREGATOR_HOST": "logstash",
|
||||
"LOG_AGGREGATOR_PORT": 8086,
|
||||
"LOG_AGGREGATOR_TYPE": "logstash",
|
||||
"LOG_AGGREGATOR_LOGGERS": [
|
||||
"awx",
|
||||
"activity_stream",
|
||||
"job_events",
|
||||
"system_tracking"
|
||||
],
|
||||
"LOG_AGGREGATOR_INDIVIDUAL_FACTS": false,
|
||||
"LOG_AGGREGATOR_ENABLED": true,
|
||||
"LOG_AGGREGATOR_PROTOCOL": "udp",
|
||||
"LOG_AGGREGATOR_TCP_TIMEOUT": 5
|
||||
}
|
||||
```
|
||||
and
|
||||
```
|
||||
{
|
||||
"LOG_AGGREGATOR_HOST": "logstash",
|
||||
"LOG_AGGREGATOR_PORT": 8087,
|
||||
"LOG_AGGREGATOR_TYPE": "logstash",
|
||||
"LOG_AGGREGATOR_LOGGERS": [
|
||||
"awx",
|
||||
"activity_stream",
|
||||
"job_events",
|
||||
"system_tracking"
|
||||
],
|
||||
"LOG_AGGREGATOR_INDIVIDUAL_FACTS": false,
|
||||
"LOG_AGGREGATOR_ENABLED": true,
|
||||
"LOG_AGGREGATOR_PROTOCOL": "tcp",
|
||||
"LOG_AGGREGATOR_TCP_TIMEOUT": 5
|
||||
}
|
||||
```
|
||||
|
||||
These can be entered via Configure-Tower-in-Tower by making a POST to
|
||||
`/api/v1/settings/logging/`.
|
||||
|
||||
@ -81,4 +118,3 @@ Nov 18, 2016
|
||||
|
||||
- Original branch point `b5a4deee142b152d4f9232ebac5bbabb2d2cef3c`
|
||||
Sep 25, 2016, before X-Pack support
|
||||
|
||||
|
@ -3,4 +3,4 @@ services:
|
||||
# Primary Tower Development Container
|
||||
tower:
|
||||
links:
|
||||
- logstash
|
||||
- logstash
|
||||
|
Loading…
Reference in New Issue
Block a user