Files
auditd-plugin-clickhouse/auditd-plugin-clickhouse.cpp
Aleksei Nikiforov 8c31581728 Improved logging
It should better indicate when and why it stops running.
Also added log about finishing processing saved data when no writer thread is used.
2020-01-30 13:32:42 +03:00

1100 lines
32 KiB
C++

/*
* auditd-plugin-clickhouse is an auditd plugin for sending auditd data
* to clickhouse DB.
* Copyright (C) 2019-2020 Aleksei Nikiforov <darktemplar@basealt.ru>
*
* This program is free software: you can redistribute it and/or modify
* it under the terms of the GNU General Public License as published by
* the Free Software Foundation, either version 3 of the License, or
* (at your option) any later version.
*
* This program is distributed in the hope that it will be useful,
* but WITHOUT ANY WARRANTY; without even the implied warranty of
* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
* GNU General Public License for more details.
*
* You should have received a copy of the GNU General Public License
* along with this program. If not, see <https://www.gnu.org/licenses/>.
*
*/
#include <signal.h>
#include <poll.h>
#include <unistd.h>
#include <errno.h>
#include <time.h>
#include <algorithm>
#include <string>
#include <map>
#include <fstream>
#include <sstream>
#include <vector>
#include <set>
#include <thread>
#include <mutex>
#include <chrono>
#include <utility>
#include <memory>
#include <auparse.h>
#include <libaudit.h>
#include <clickhouse-cpp/client.h>
#include <boost/scope_exit.hpp>
#include <boost/property_tree/ptree.hpp>
#include <boost/property_tree/ini_parser.hpp>
#include <boost/property_tree/json_parser.hpp>
#include <boost/optional.hpp>
#include <boost/filesystem.hpp>
#include <boost/lexical_cast.hpp>
#include <regex>
#include "auditd-datatypes.hpp"
#include "logging.hpp"
#include "utils.hpp"
int runpipes[2] = { -1, -1 };
volatile bool running = true;
volatile bool got_signal = false;
int write_thread_control_pipes[2] = { -1, -1 };
int write_thread_data_pipes[2] = { -1, -1 };
std::mutex g_audit_records_list_mutex;
std::list<std::shared_ptr<AuditRecord> > g_audit_records_list;
template <typename T>
struct SharedPointerLess: std::binary_function<std::shared_ptr<T>, std::shared_ptr<T>, bool>
{
constexpr bool operator()(const std::shared_ptr<T> &lhs, const std::shared_ptr<T> &rhs) const
{
return (lhs && rhs) ? (*lhs < *rhs) : (lhs.get() < rhs.get());
}
};
struct CallbackData
{
std::map<std::string, std::string> datatypes_map;
std::list<std::tuple<std::string, std::string, std::string> > datatype_regexps_map;
std::map<std::string, std::function<std::shared_ptr<AbstractRecordField>()> > type_creation_map;
std::set<std::string> all_fields_set;
clickhouse::Client *clickhouse_client;
std::string table_name;
int write_timeout;
int write_count_limit;
std::string data_directory;
CallbackData()
: clickhouse_client(nullptr),
write_timeout(-1),
write_count_limit(-1)
{
}
};
static void stop_running()
{
running = false;
if (runpipes[1] != -1)
{
write(runpipes[1], "1", 1);
}
}
static void term_handler(int sig)
{
got_signal = true;
stop_running();
}
void initialize_data_block(
std::map<std::string, clickhouse::ColumnRef> &data,
std::map<std::string, bool> &data_present_map,
std::map<std::string, std::vector<AbstractRecordField::Column> > &empty_data_map,
const std::map<std::string, std::string> &datatypes_map,
const std::list<std::tuple<std::string, std::string, std::string> > &datatype_regexps_map,
const std::map<std::string, std::function<std::shared_ptr<AbstractRecordField>()> > &generators)
{
data["record_time"] = std::make_shared<clickhouse::ColumnDateTime>();
data["record_milli"] = std::make_shared<clickhouse::ColumnUInt64>();
data["record_serial"] = std::make_shared<clickhouse::ColumnUInt64>();
data["record_node"] = std::make_shared<clickhouse::ColumnString>();
for (auto iter = datatypes_map.begin(); iter != datatypes_map.end(); ++iter)
{
auto factory_iter = generators.find(iter->second);
if (factory_iter != generators.end())
{
auto columns = factory_iter->second()->generateColumnsAndNames(sanitize_column_name(iter->first));
for (auto column_iter = columns.begin(); column_iter != columns.end(); ++column_iter)
{
data[column_iter->name] = column_iter->value;
}
data_present_map[iter->first] = false;
auto empty_record = factory_iter->second();
auto empty_columns = empty_record->generateColumnsAndNames(sanitize_column_name(iter->first));
empty_record->addToColumn(empty_columns);
empty_data_map[sanitize_column_name(iter->first)] = empty_columns;
}
else
{
Logger::write("Couldn't find registered type name for record with type\"%s\"", iter->second.c_str());
}
}
for (auto iter = datatype_regexps_map.begin(); iter != datatype_regexps_map.end(); ++iter)
{
auto factory_iter = generators.find(std::get<1>(*iter));
if (factory_iter != generators.end())
{
auto columns = factory_iter->second()->generateColumnsAndNames(sanitize_column_name(std::get<2>(*iter)));
for (auto column_iter = columns.begin(); column_iter != columns.end(); ++column_iter)
{
data[column_iter->name] = column_iter->value;
}
data_present_map[std::get<2>(*iter)] = false;
auto empty_record = factory_iter->second();
auto empty_columns = empty_record->generateColumnsAndNames(sanitize_column_name(std::get<2>(*iter)));
empty_record->addToColumn(empty_columns);
empty_data_map[sanitize_column_name(std::get<2>(*iter))] = empty_columns;
}
else
{
Logger::write("Couldn't find registered type name for record with type\"%s\"", std::get<2>(*iter).c_str());
}
}
// also add "unknown_field"
{
auto columns = InterpretedStringArrayRecordField::createRecord()->generateColumnsAndNames(sanitize_column_name("unknown_field"));
for (auto column_iter = columns.begin(); column_iter != columns.end(); ++column_iter)
{
data[column_iter->name] = column_iter->value;
}
data_present_map["unknown_field"] = false;
auto empty_record = InterpretedStringArrayRecordField::createRecord();
auto empty_columns = empty_record->generateColumnsAndNames(sanitize_column_name("unknown_field"));
empty_record->addToColumn(empty_columns);
empty_data_map[sanitize_column_name("unknown_field")] = empty_columns;
}
}
void generate_clickhouse_columns_from_audit_records(
std::map<std::string, clickhouse::ColumnRef> &data,
std::map<std::string, bool> &data_present_map,
const AuditRecord &record)
{
ensure_not_null(data["record_time"]->As<clickhouse::ColumnDateTime>())->Append(record.time);
ensure_not_null(data["record_milli"]->As<clickhouse::ColumnUInt64>())->Append(record.milliseconds);
ensure_not_null(data["record_serial"]->As<clickhouse::ColumnUInt64>())->Append(record.serial);
ensure_not_null(data["record_node"]->As<clickhouse::ColumnString>())->Append(record.node);
for (auto iter = record.fields.begin(); iter != record.fields.end(); ++iter)
{
auto columns = iter->second->generateColumnsAndNames(sanitize_column_name(iter->first));
iter->second->addToColumn(columns);
for (auto column_iter = columns.begin(); column_iter != columns.end(); ++column_iter)
{
data[column_iter->name]->Append(column_iter->value);
}
data_present_map[iter->first] = true;
}
}
void fill_empty_columns(
std::map<std::string, clickhouse::ColumnRef> &data,
const std::map<std::string, bool> &data_present_map,
const std::map<std::string, std::vector<AbstractRecordField::Column> > &empty_data_map)
{
for (auto iter = data_present_map.begin(); iter != data_present_map.end(); ++iter)
{
if (iter->second)
{
continue;
}
auto empty_data_iter = empty_data_map.find(sanitize_column_name(iter->first));
if (empty_data_iter != empty_data_map.end())
{
for (auto empty_data_column_iter = empty_data_iter->second.begin(); empty_data_column_iter != empty_data_iter->second.end(); ++empty_data_column_iter)
{
data[empty_data_column_iter->name]->Append(empty_data_column_iter->value);
}
}
}
}
void check_column_sizes_in_data(const std::map<std::string, clickhouse::ColumnRef> &data)
{
// now check that each column has same size
auto iter = data.begin();
size_t count = iter->second->Size();
for (++iter ; iter != data.end(); ++iter)
{
if (count != iter->second->Size())
{
std::stringstream errorstr;
errorstr << "Columns size doesn't match, expected " << count << ", got " << iter->second->Size();
throw std::runtime_error(errorstr.str());
}
}
}
void add_audit_data_to_clickhouse_block(
const std::map<std::string, clickhouse::ColumnRef> &data,
clickhouse::Block &block)
{
for (auto iter = data.begin(); iter != data.end(); ++iter)
{
block.AppendColumn(iter->first, iter->second);
}
}
void auparse_callback(auparse_state_t *au, auparse_cb_event_t cb_event_type, void *user_data)
{
CallbackData *callback_data = reinterpret_cast<CallbackData*>(user_data);
if ((!callback_data) || (auparse_first_record(au) <= 0))
{
return;
}
size_t record = 1;
for (;;)
{
try
{
if (cb_event_type == AUPARSE_CB_EVENT_READY)
{
std::shared_ptr<AuditRecord> audit_record = std::make_shared<AuditRecord>();
audit_record->time = auparse_get_time(au);
audit_record->milliseconds = auparse_get_milli(au);
audit_record->serial = auparse_get_serial(au);
audit_record->node = string_or_null_and_free(auparse_get_node(au));
if (auparse_first_field(au) > 0)
{
do
{
const std::string field_name = string_or_null(auparse_get_field_name(au));
if (field_name != "node") // skip node since it's already processed
{
auparse_type_t field_type = static_cast<auparse_type_t>(auparse_get_field_type(au));
std::string database_type;
std::string database_name;
// first search for this field name in datatypes map,
// if it's not found there search all elements in datatypes regexp container
{
auto iter = callback_data->datatypes_map.find(field_name);
if (iter != callback_data->datatypes_map.end())
{
database_type = iter->second;
database_name = iter->first;
}
else
{
for (auto regexp_iter = callback_data->datatype_regexps_map.begin(); regexp_iter != callback_data->datatype_regexps_map.end(); ++regexp_iter)
{
std::regex audit_name_regex(std::get<0>(*regexp_iter));
if (std::regex_match(field_name, audit_name_regex))
{
database_type = std::get<1>(*regexp_iter);
database_name = std::get<2>(*regexp_iter);
break;
}
}
}
}
if (database_type.empty() || database_name.empty())
{
Logger::write("Couldn't find matching database entry for field with name \"%s\" and type \"%s\", putting it into \"unknown_field\" field", field_name.c_str(), field_type_to_string(field_type).c_str());
database_type = "string_array";
database_name = "unknown_field";
}
else if (!check_field_type(field_type, database_type, field_name))
{
Logger::write("Warning: expected datatype doesn't match database datatype for field \"%s\": expected \"%s\", actual \"%s\"",
field_name.c_str(), database_type.c_str(), field_type_to_string(field_type).c_str());
}
std::shared_ptr<AbstractRecordField> data_ptr;
// If field is present in audit record, reuse it
// and update it's value,
// otherwise create new one and register it
{
auto data_iter = audit_record->fields.find(database_name);
if (data_iter != audit_record->fields.end())
{
data_ptr = data_iter->second;
}
else
{
auto iter = callback_data->type_creation_map.find(database_type);
if (iter != callback_data->type_creation_map.end())
{
data_ptr = iter->second();
}
else
{
Logger::write("Warning: no creator function found for data type \"%s\", using \"string\" as fallback", database_type.c_str());
data_ptr = InterpretedStringRecordField::createRecord();
}
audit_record->fields[database_name] = data_ptr;
}
}
data_ptr->addOrUpdateValue(au);
}
} while (auparse_next_field(au) > 0);
}
if (not callback_data->data_directory.empty())
{
audit_record->filename = callback_data->data_directory + boost::filesystem::path::separator + generate_name_for_audit_record(*audit_record);
boost::property_tree::write_json(audit_record->filename, audit_record->toPtree());
}
if (callback_data->clickhouse_client && (!callback_data->table_name.empty()))
{
// send data directly
clickhouse::Block block;
std::map<std::string, clickhouse::ColumnRef> data;
std::map<std::string, bool> data_present_map;
std::map<std::string, std::vector<AbstractRecordField::Column> > empty_data_map;
initialize_data_block(data, data_present_map, empty_data_map, callback_data->datatypes_map, callback_data->datatype_regexps_map, callback_data->type_creation_map);
generate_clickhouse_columns_from_audit_records(data, data_present_map, *audit_record);
fill_empty_columns(data, data_present_map, empty_data_map);
check_column_sizes_in_data(data);
add_audit_data_to_clickhouse_block(data, block);
callback_data->clickhouse_client->Insert(callback_data->table_name, block);
// Data written, remove it
if (not audit_record->filename.empty())
{
boost::filesystem::remove(audit_record->filename);
}
}
else
{
// send data to buffering thread and notify it if necessary
size_t current_count = 0;
{ // lock
std::lock_guard<std::mutex> audit_records_list_lock(g_audit_records_list_mutex);
g_audit_records_list.push_front(audit_record);
current_count = g_audit_records_list.size();
} //unlock
if ((callback_data->write_count_limit > 0) && (current_count % static_cast<size_t>(callback_data->write_count_limit) == 0))
{
if (write_thread_data_pipes[1] != -1)
{
write(write_thread_data_pipes[1], "1", 1);
}
}
}
}
}
catch (const std::exception &e)
{
Logger::write("Caught exception while processing audit record %zu/%zu: %s", record, (size_t) auparse_get_num_records(au), e.what());
}
catch (...)
{
Logger::write("Caught unknown exception while processing audit record %zu/%zu", record, (size_t) auparse_get_num_records(au));
}
if (auparse_get_num_records(au) > record)
{
++record;
auparse_next_record(au);
}
else
{
break;
}
}
}
void writer_thread_function(clickhouse::Client &client, const std::string &table_name, int timeout, int count_limit, const std::string &data_directory)
{
struct pollfd pollfds[2];
pollfds[0].fd = write_thread_control_pipes[0];
pollfds[1].fd = write_thread_data_pipes[0];
auto current_time = std::chrono::steady_clock::now();
auto last_time = current_time;
bool thread_keep_running = true;
while (thread_keep_running)
{
pollfds[0].events = POLLIN | POLLHUP;
pollfds[0].revents = 0;
pollfds[1].events = POLLIN | POLLHUP;
pollfds[1].revents = 0;
int res = poll(pollfds, sizeof(pollfds) / sizeof(pollfds[0]), timeout);
current_time = std::chrono::steady_clock::now();
bool write_data = false;
bool poll_timeout = false;
if (res < 0)
{
if (errno != EINTR)
{
// error occured, finish running
Logger::write("Poll returned error: %d", errno);
stop_running();
thread_keep_running = false;
}
}
else if (res == 0)
{
// timeout
write_data = true;
poll_timeout = true;
}
else if (pollfds[0].revents & POLLIN)
{
// input data in control pipe means shutdown request
thread_keep_running = false;
}
else if ((pollfds[0].revents & POLLHUP) || (pollfds[0].revents & POLLERR))
{
// pipe closed too early, an error
Logger::write("Read returned pollhup or pollerr");
stop_running();
thread_keep_running = false;
}
else if (pollfds[1].revents & POLLIN)
{
// data was received, and records count exceeds limit, then write it
{
char c;
read(pollfds[1].fd, &c, sizeof(c));
}
write_data = true;
}
else if ((pollfds[1].revents & POLLHUP) || (pollfds[1].revents & POLLERR))
{
// pipe closed too early, an error
Logger::write("Read returned pollhup or pollerr");
stop_running();
thread_keep_running = false;
}
if (write_data || (!thread_keep_running))
{
try
{
// write all saved up data and log about it
std::list<std::shared_ptr<AuditRecord> > local_audit_records_list;
{ // lock
std::lock_guard<std::mutex> audit_records_list_lock(g_audit_records_list_mutex);
local_audit_records_list = g_audit_records_list;
} // unlock
bool write_and_log = (poll_timeout
|| (!thread_keep_running)
|| ((count_limit > 0) && (local_audit_records_list.size() >= static_cast<size_t>(count_limit))));
if (write_and_log && (!local_audit_records_list.empty()))
{
Logger::write("Writer thread: preparing to write %zu elements", local_audit_records_list.size());
clickhouse::Block block;
std::map<std::string, clickhouse::ColumnRef> data;
std::map<std::string, bool> data_present_map;
std::map<std::string, std::vector<AbstractRecordField::Column> > empty_data_map;
initialize_data_block(data, data_present_map, empty_data_map, get_datatypes_map(), get_datatype_regexps_map(), get_type_creation_map());
for (auto iter = local_audit_records_list.rbegin(); iter != local_audit_records_list.rend(); ++iter)
{
generate_clickhouse_columns_from_audit_records(data, data_present_map, **iter);
fill_empty_columns(data, data_present_map, empty_data_map);
check_column_sizes_in_data(data);
// reset presence indication for next iteration
for (auto data_present_iter = data_present_map.begin(); data_present_iter != data_present_map.end(); ++data_present_iter)
{
data_present_iter->second = false;
}
}
add_audit_data_to_clickhouse_block(data, block);
client.Insert(table_name, block);
if (not data_directory.empty())
{
for (auto iter = local_audit_records_list.rbegin(); iter != local_audit_records_list.rend(); ++iter)
{
try
{
boost::filesystem::remove((*iter)->filename);
}
catch (const std::exception &e)
{
Logger::write("Writer thread: caught exception while trying to remove data file \"%s\": %s", (*iter)->filename.c_str(), e.what());
}
catch (...)
{
Logger::write("Writer thread: caught unknown exception while trying to remove data file \"%s\"", (*iter)->filename.c_str());
}
}
}
// successfully wrote data, now remove all written data
{ // lock
std::lock_guard<std::mutex> audit_records_list_lock(g_audit_records_list_mutex);
g_audit_records_list.resize(g_audit_records_list.size() - local_audit_records_list.size());
} // unlock
}
if (write_and_log)
{
Logger::write("Wrote %zu records in last %zu seconds",
local_audit_records_list.size(),
static_cast<size_t>(std::chrono::duration_cast<std::chrono::seconds>(current_time - last_time).count()));
}
last_time = current_time;
}
catch (const std::exception &e)
{
Logger::write("Writer thread: caught exception: %s", e.what());
}
catch (...)
{
Logger::write("Writer thread: caught unknown exception");
}
last_time = current_time;
}
}
}
std::string construct_clickhouse_datatype_string(const std::string &name, const std::string &audit_type)
{
static const std::map<std::string, std::vector<std::string> > audit_table_map = {
{ "string", { " Nullable(String)" } },
{ "integer", { "_IntValue Nullable(Int64)", "_InterpretedValue LowCardinality(Nullable(String))" } },
{ "string_array", { "_Name Array(String)", "_Value Array(String)" } }
};
std::vector<std::string> clickhouse_types;
auto iter = audit_table_map.find(audit_type);
if (iter != audit_table_map.end())
{
clickhouse_types = iter->second;
}
else
{
// Fallback to string
Logger::write("Warning: unknown database type for record name \"%s\"", name.c_str());
clickhouse_types.push_back(" Nullable(String)");
}
std::stringstream result;
result << sanitize_column_name(name) + clickhouse_types[0];
for (size_t i = 1; i < clickhouse_types.size(); ++i)
{
result << ", " << sanitize_column_name(name) + clickhouse_types[i];
}
return result.str();
}
int main(int argc, char **argv)
{
if (argc != 2)
{
Logger::write("Error: USAGE: %s config", argv[0]);
return -1;
}
try
{
Logger::write("Initializing");
clickhouse::ClientOptions client_options;
std::string table_name = "AuditData";
std::string datatypes_filename;
std::string data_directory;
int write_timeout = -1;
int write_count_limit = -1;
bool use_writer_thread = false;
if (pipe(runpipes) < 0)
{
throw std::runtime_error("Failed to create pipes");
}
BOOST_SCOPE_EXIT(&runpipes)
{
close(runpipes[0]);
close(runpipes[1]);
runpipes[0] = -1;
runpipes[1] = -1;
} BOOST_SCOPE_EXIT_END;
{
struct sigaction sa;
sa.sa_flags = 0;
sigemptyset(&sa.sa_mask);
sa.sa_handler = term_handler;
if (sigaction(SIGTERM, &sa, NULL) < 0)
{
throw std::runtime_error("Failed to set sigterm handler");
}
}
if (pipe(write_thread_control_pipes) < 0)
{
throw std::runtime_error("Failed to create pipes");
}
BOOST_SCOPE_EXIT(&write_thread_control_pipes)
{
close(write_thread_control_pipes[0]);
close(write_thread_control_pipes[1]);
write_thread_control_pipes[0] = -1;
write_thread_control_pipes[1] = -1;
} BOOST_SCOPE_EXIT_END;
if (pipe(write_thread_data_pipes) < 0)
{
throw std::runtime_error("Failed to create pipes");
}
BOOST_SCOPE_EXIT(&write_thread_data_pipes)
{
close(write_thread_data_pipes[0]);
close(write_thread_data_pipes[1]);
write_thread_data_pipes[0] = -1;
write_thread_data_pipes[1] = -1;
} BOOST_SCOPE_EXIT_END;
/* First read config */
{
boost::property_tree::ptree clickhouse_config_tree;
std::ifstream stream(argv[1]);
boost::property_tree::read_ini(stream, clickhouse_config_tree);
auto general = clickhouse_config_tree.get_child_optional("General");
if (general)
{
optional_set(datatypes_filename, *general, "DatatypesDescriptionFile");
optional_set(write_timeout, *general, "WriteTimeout");
optional_set(write_count_limit, *general, "WriteCountLimit");
optional_set(data_directory, *general, "DataDirectory");
if (!data_directory.empty())
{
try
{
if (boost::filesystem::create_directory(boost::filesystem::path(data_directory)))
{
// if directory is created, set permissions
boost::filesystem::permissions(boost::filesystem::path(data_directory), boost::filesystem::owner_all);
}
}
catch (const std::exception &exc)
{
Logger::write("Caught exception while creating data directory: %s", exc.what());
}
catch (...)
{
Logger::write("Caught unknown exception while creating data directory");
}
}
write_timeout = (write_timeout > 0) ? (write_timeout * 1000) : -1;
}
auto client_connection = clickhouse_config_tree.get_child_optional("Connection");
if (client_connection)
{
optional_set(table_name, *client_connection, "TableName", &is_valid_table_name);
optional_set(client_options.host, *client_connection, "Hostname");
optional_set(client_options.port, *client_connection, "Port");
optional_set(client_options.default_database, *client_connection, "DefaultDatabase");
optional_set(client_options.user, *client_connection, "Username");
optional_set(client_options.password, *client_connection, "Password");
optional_set(client_options.ping_before_query, *client_connection, "PingBeforeQuery");
optional_set(client_options.send_retries, *client_connection, "SendRetries");
{
auto element = client_connection->get_child_optional("RetryTimeout");
if (element)
{
client_options.retry_timeout = std::chrono::seconds(element->get_value<int>());
}
}
{
auto element = client_connection->get_child_optional("CompressionMethod");
if (element)
{
const auto value = element->get_value<std::string>();
const std::map<std::string, clickhouse::CompressionMethod> compression_method_map =
{
{ "None", clickhouse::CompressionMethod::None },
{ "LZ4", clickhouse::CompressionMethod::LZ4 },
};
auto iter = compression_method_map.find(value);
if (iter != compression_method_map.end())
{
client_options.compression_method = iter->second;
}
else
{
client_options.compression_method = clickhouse::CompressionMethod::None;
}
}
}
}
auto logging = clickhouse_config_tree.get_child_optional("Logging");
if (logging)
{
std::string logfilename;
optional_set(logfilename, *logging, "Logfile");
if (!logfilename.empty())
{
Logger::open(logfilename.c_str());
}
}
}
Logger::initialize();
Logger::dump();
read_datatypes_map(datatypes_filename);
// If no limits are set, don't use special writer thread
use_writer_thread = ((write_timeout > 0) || (write_count_limit > 0));
/* Now connect to clickhouse */
clickhouse::Client client(client_options);
CallbackData callback_data;
callback_data.datatypes_map = get_datatypes_map();
callback_data.datatype_regexps_map = get_datatype_regexps_map();
callback_data.type_creation_map = get_type_creation_map();
if (!use_writer_thread)
{
callback_data.clickhouse_client = &client;
callback_data.table_name = table_name;
}
callback_data.write_timeout = write_timeout;
callback_data.write_count_limit = write_count_limit;
callback_data.data_directory = data_directory;
{
std::stringstream str;
str << "CREATE TABLE IF NOT EXISTS " << table_name << " (record_time DateTime, record_milli UInt64, record_serial UInt64, record_node String";
for (auto iter = callback_data.datatypes_map.begin(); iter != callback_data.datatypes_map.end(); ++iter)
{
str << ", " << construct_clickhouse_datatype_string(iter->first, iter->second);
callback_data.all_fields_set.insert(iter->first);
}
for (auto iter = callback_data.datatype_regexps_map.begin(); iter != callback_data.datatype_regexps_map.end(); ++iter)
{
str << ", " << construct_clickhouse_datatype_string(std::get<2>(*iter), std::get<1>(*iter));
callback_data.all_fields_set.insert(std::get<2>(*iter));
}
str << ", " << construct_clickhouse_datatype_string("unknown_field", "string_array");
callback_data.all_fields_set.insert("unknown_field");
str << ") ENGINE = MergeTree ORDER BY (record_time, record_milli, record_serial, record_node)";
client.Execute(str.str());
}
if (not data_directory.empty())
{
// read all previously saved data, and send it to writer thread or write it right here
// and remove written files
std::set<std::shared_ptr<AuditRecord>, SharedPointerLess<AuditRecord> > audit_records_set;
for (auto iter = boost::filesystem::directory_iterator(data_directory); iter != boost::filesystem::directory_iterator(); ++iter)
{
Logger::write("Reading saved file: %s", iter->path().c_str());
try
{
boost::property_tree::ptree file_data;
boost::property_tree::read_json(iter->path().native(), file_data);
auto audit_record = AuditRecord::fromPtree(file_data);
audit_record->filename = iter->path().native();
audit_records_set.insert(audit_record);
}
catch (const std::exception &exc)
{
Logger::write("Error while reading saved file \"%s\": %s", iter->path().c_str(), exc.what());
}
catch (...)
{
Logger::write("Unknown error while reading saved file \"%s\"", iter->path().c_str());
}
}
if (use_writer_thread)
{
// send data to buffering thread and notify it if necessary
size_t current_count = 0;
{ // lock
std::lock_guard<std::mutex> audit_records_list_lock(g_audit_records_list_mutex);
for (auto iter = audit_records_set.begin(); iter != audit_records_set.end(); ++iter)
{
g_audit_records_list.push_front(*iter);
}
} // unlock
if ((write_count_limit > 0) && (current_count >= static_cast<size_t>(write_count_limit)))
{
if (write_thread_data_pipes[1] != -1)
{
write(write_thread_data_pipes[1], "1", 1);
}
}
Logger::write("Sent all saved data to writer thread");
}
else
{
clickhouse::Block block;
std::map<std::string, clickhouse::ColumnRef> data_map;
std::map<std::string, bool> data_present_map;
std::map<std::string, std::vector<AbstractRecordField::Column> > empty_data_map;
initialize_data_block(data_map, data_present_map, empty_data_map, get_datatypes_map(), get_datatype_regexps_map(), get_type_creation_map());
for (auto iter = audit_records_set.begin(); iter != audit_records_set.end(); ++iter)
{
generate_clickhouse_columns_from_audit_records(data_map, data_present_map, **iter);
fill_empty_columns(data_map, data_present_map, empty_data_map);
check_column_sizes_in_data(data_map);
// reset presence indication for next iteration
for (auto data_present_iter = data_present_map.begin(); data_present_iter != data_present_map.end(); ++data_present_iter)
{
data_present_iter->second = false;
}
}
add_audit_data_to_clickhouse_block(data_map, block);
client.Insert(table_name, block);
for (auto iter = audit_records_set.begin(); iter != audit_records_set.end(); ++iter)
{
try
{
boost::filesystem::remove((*iter)->filename);
}
catch (const std::exception &e)
{
Logger::write("Caught exception while trying to remove data file \"%s\": %s", (*iter)->filename.c_str(), e.what());
}
catch (...)
{
Logger::write("Caught unknown exception while trying to remove data file \"%s\"", (*iter)->filename.c_str());
}
}
}
Logger::write("Wrote all saved data to DB");
}
Logger::write("Initializing data processing");
std::unique_ptr<std::thread, void(*)(std::thread*)> writer_thread(
use_writer_thread ? new std::thread(
writer_thread_function,
std::ref(client),
std::cref(table_name),
write_timeout,
write_count_limit,
std::cref(data_directory)
) : nullptr,
[] (std::thread *thread) {
if (thread)
{
if (thread->joinable())
{
if (write_thread_control_pipes[1] != -1)
{
write(write_thread_control_pipes[1], "1", 1);
}
thread->join();
}
delete thread;
}
});
std::unique_ptr<auparse_state_t, void(*)(auparse_state_t*)> au(auparse_init(AUSOURCE_FEED, 0), [](auparse_state_t *obj){
if (obj)
{
auparse_flush_feed(obj);
auparse_destroy(obj);
}
});
if (!au)
{
throw std::runtime_error("Failed to initialize audit");
}
auparse_add_callback(au.get(), auparse_callback, &callback_data, NULL);
char data[MAX_AUDIT_MESSAGE_LENGTH];
struct pollfd pollfds[2];
pollfds[0].fd = STDIN_FILENO;
pollfds[1].fd = runpipes[0];
Logger::write("Initialization finished");
while (running)
{
pollfds[0].events = POLLIN | POLLHUP;
pollfds[0].revents = 0;
pollfds[1].events = POLLIN | POLLHUP;
pollfds[1].revents = 0;
int res = poll(pollfds, sizeof(pollfds) / sizeof(pollfds[0]), -1);
if (res < 0)
{
if (errno != EINTR)
{
// error occured, finish running
Logger::write("Poll returned error: %d", errno);
running = false;
}
}
else if (res == 0)
{
// timeout, do nothing
}
else if (pollfds[0].revents & POLLIN)
{
ssize_t readsize = read(STDIN_FILENO, data, sizeof(data));
if (readsize > 0)
{
auparse_feed(au.get(), data, readsize);
// assume that data wasn't split, and flush it
// otherwise, libauparse keeps increasing it's memory footprint
auparse_flush_feed(au.get());
}
else
{
// error occured, finish running
Logger::write("Read returned error: %d", errno);
running = false;
}
}
else if ((pollfds[0].revents & POLLHUP) || (pollfds[0].revents & POLLERR))
{
// stdin closed, no more data, finish running
Logger::write("Stdin hangup or error, stopping");
running = false;
}
}
Logger::write("Finishing running, signal received: %s", got_signal ? "true" : "false");
}
catch (const std::exception &e)
{
Logger::write("Caught exception: %s", e.what());
return -1;
}
catch (...)
{
Logger::write("Caught unknown exception");
return -1;
}
try
{
Logger::write("Finished running");
}
catch (...)
{
// ignore exceptions here
}
return 0;
}