auditd-plugin-clickhouse/auditd-plugin-clickhouse.cpp

1055 lines
30 KiB
C++

/*
* auditd-plugin-clickhouse is an auditd plugin for sending auditd data
* to clickhouse DB.
* Copyright (C) 2019-2020 Aleksei Nikiforov <darktemplar@basealt.ru>
*
* This program is free software: you can redistribute it and/or modify
* it under the terms of the GNU General Public License as published by
* the Free Software Foundation, either version 3 of the License, or
* (at your option) any later version.
*
* This program is distributed in the hope that it will be useful,
* but WITHOUT ANY WARRANTY; without even the implied warranty of
* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
* GNU General Public License for more details.
*
* You should have received a copy of the GNU General Public License
* along with this program. If not, see <https://www.gnu.org/licenses/>.
*
*/
#include <signal.h>
#include <poll.h>
#include <unistd.h>
#include <errno.h>
#include <time.h>
#include <algorithm>
#include <string>
#include <map>
#include <fstream>
#include <sstream>
#include <vector>
#include <set>
#include <thread>
#include <mutex>
#include <chrono>
#include <utility>
#include <memory>
#include <auparse.h>
#include <libaudit.h>
#include <clickhouse-cpp/client.h>
#include <boost/scope_exit.hpp>
#include <boost/property_tree/ptree.hpp>
#include <boost/property_tree/ini_parser.hpp>
#include <boost/property_tree/json_parser.hpp>
#include <boost/optional.hpp>
#include <boost/filesystem.hpp>
#include <boost/lexical_cast.hpp>
#include <regex>
#include "auditd-datatypes.hpp"
#include "logging.hpp"
#include "utils.hpp"
int runpipes[2] = { -1, -1 };
volatile bool running = true;
int write_thread_control_pipes[2] = { -1, -1 };
int write_thread_data_pipes[2] = { -1, -1 };
std::mutex g_audit_records_list_mutex;
std::list<std::shared_ptr<AuditRecord> > g_audit_records_list;
template <typename T>
struct SharedPointerLess: std::binary_function<std::shared_ptr<T>, std::shared_ptr<T>, bool>
{
constexpr bool operator()(const std::shared_ptr<T> &lhs, const std::shared_ptr<T> &rhs) const
{
return (lhs && rhs) ? (*lhs < *rhs) : (lhs.get() < rhs.get());
}
};
struct CallbackData
{
std::map<std::string, std::string> datatypes_map;
std::list<std::tuple<std::string, std::string, std::string> > datatype_regexps_map;
std::map<std::string, std::function<std::shared_ptr<AbstractRecordField>(const std::string &name)> > type_creation_map;
std::set<std::string> all_fields_set;
clickhouse::Client *clickhouse_client;
std::string table_name;
int write_timeout;
int write_count_limit;
std::string data_directory;
CallbackData()
: clickhouse_client(nullptr),
write_timeout(-1),
write_count_limit(-1)
{
}
};
static void stop_running()
{
running = false;
if (runpipes[1] != -1)
{
write(runpipes[1], "1", 1);
}
}
static void term_handler(int sig)
{
stop_running();
}
void initialize_data_block(
std::map<std::string, clickhouse::ColumnRef > &data,
const std::map<std::string, std::string> &datatypes_map,
const std::list<std::tuple<std::string, std::string, std::string> > &datatype_regexps_map,
const std::map<std::string, std::function<std::shared_ptr<AbstractRecordField>(const std::string &name)> > &generators)
{
data["record_time"] = std::make_shared<clickhouse::ColumnDateTime>();
data["record_milli"] = std::make_shared<clickhouse::ColumnUInt64>();
data["record_serial"] = std::make_shared<clickhouse::ColumnUInt64>();
data["record_node"] = std::make_shared<clickhouse::ColumnString>();
for (auto iter = datatypes_map.begin(); iter != datatypes_map.end(); ++iter)
{
auto factory_iter = generators.find(iter->second);
if (factory_iter != generators.end())
{
auto columns = factory_iter->second(sanitize_column_name(iter->first))->generateColumnsAndNames();
for (auto column_iter = columns.begin(); column_iter != columns.end(); ++column_iter)
{
data[column_iter->name] = column_iter->value;
}
}
else
{
Logger::write("Couldn't find registered type name for record with type\"%s\"", iter->second.c_str());
}
}
for (auto iter = datatype_regexps_map.begin(); iter != datatype_regexps_map.end(); ++iter)
{
auto factory_iter = generators.find(std::get<1>(*iter));
if (factory_iter != generators.end())
{
auto columns = factory_iter->second(sanitize_column_name(std::get<2>(*iter)))->generateColumnsAndNames();
for (auto column_iter = columns.begin(); column_iter != columns.end(); ++column_iter)
{
data[column_iter->name] = column_iter->value;
}
}
else
{
Logger::write("Couldn't find registered type name for record with type\"%s\"", std::get<2>(*iter).c_str());
}
}
// also add "unknown_field"
{
auto columns = InterpretedStringArrayRecordField::createRecord(sanitize_column_name("unknown_field"))->generateColumnsAndNames();
for (auto column_iter = columns.begin(); column_iter != columns.end(); ++column_iter)
{
data[column_iter->name] = column_iter->value;
}
}
}
void generate_clickhouse_columns_from_audit_records(
std::map<std::string, clickhouse::ColumnRef > &data,
const AuditRecord &record)
{
ensure_not_null(data["record_time"]->As<clickhouse::ColumnDateTime>())->Append(record.time);
ensure_not_null(data["record_milli"]->As<clickhouse::ColumnUInt64>())->Append(record.milliseconds);
ensure_not_null(data["record_serial"]->As<clickhouse::ColumnUInt64>())->Append(record.serial);
ensure_not_null(data["record_node"]->As<clickhouse::ColumnString>())->Append(record.node);
for (auto iter = record.fields.begin(); iter != record.fields.end(); ++iter)
{
auto columns = iter->second->generateColumnsAndNames();
iter->second->addToColumn(columns);
for (auto column_iter = columns.begin(); column_iter != columns.end(); ++column_iter)
{
data[sanitize_column_name(column_iter->name)]->Append(column_iter->value);
}
}
// now check that each column has same size
{
auto iter = data.begin();
size_t count = iter->second->Size();
for (++iter ; iter != data.end(); ++iter)
{
if (count != iter->second->Size())
{
std::stringstream errorstr;
errorstr << "Columns size doesn't match, expected " << count << ", got " << iter->second->Size();
throw std::runtime_error(errorstr.str());
}
}
}
}
void add_audit_data_to_clickhouse_block(
const std::map<std::string, clickhouse::ColumnRef > &data,
clickhouse::Block &block)
{
for (auto iter = data.begin(); iter != data.end(); ++iter)
{
block.AppendColumn(iter->first, iter->second);
}
}
void auparse_callback(auparse_state_t *au, auparse_cb_event_t cb_event_type, void *user_data)
{
CallbackData *callback_data = reinterpret_cast<CallbackData*>(user_data);
if ((!callback_data) || (auparse_first_record(au) <= 0))
{
return;
}
size_t record = 1;
for (;;)
{
try
{
if (cb_event_type == AUPARSE_CB_EVENT_READY)
{
std::shared_ptr<AuditRecord> audit_record = std::make_shared<AuditRecord>();
audit_record->time = auparse_get_time(au);
audit_record->milliseconds = auparse_get_milli(au);
audit_record->serial = auparse_get_serial(au);
audit_record->node = string_or_null_and_free(auparse_get_node(au));
if (auparse_first_field(au) > 0)
{
do
{
const std::string field_name = string_or_null(auparse_get_field_name(au));
if (field_name != "node") // skip node since it's already processed
{
auparse_type_t field_type = static_cast<auparse_type_t>(auparse_get_field_type(au));
std::string database_type;
std::string database_name;
// first search for this field name in datatypes map,
// if it's not found there search all elements in datatypes regexp container
{
auto iter = callback_data->datatypes_map.find(field_name);
if (iter != callback_data->datatypes_map.end())
{
database_type = iter->second;
database_name = iter->first;
}
else
{
for (auto regexp_iter = callback_data->datatype_regexps_map.begin(); regexp_iter != callback_data->datatype_regexps_map.end(); ++regexp_iter)
{
std::regex audit_name_regex(std::get<0>(*regexp_iter));
if (std::regex_match(field_name, audit_name_regex))
{
database_type = std::get<1>(*regexp_iter);
database_name = std::get<2>(*regexp_iter);
break;
}
}
}
}
if (database_type.empty() || database_name.empty())
{
Logger::write("Couldn't find matching database entry for field with name \"%s\" and type \"%s\", putting it into \"unknown_field\" field", field_name.c_str(), field_type_to_string(field_type).c_str());
database_type = "string_array";
database_name = "unknown_field";
}
else if (!check_field_type(field_type, database_type, field_name))
{
Logger::write("Warning: expected datatype doesn't match database datatype for field \"%s\": expected \"%s\", actual \"%s\"",
field_name.c_str(), database_type.c_str(), field_type_to_string(field_type).c_str());
}
std::shared_ptr<AbstractRecordField> data_ptr;
// If field is present in audit record, reuse it
// and update it's value,
// otherwise create new one and register it
{
auto data_iter = audit_record->fields.find(database_name);
if (data_iter != audit_record->fields.end())
{
data_ptr = data_iter->second;
}
else
{
auto iter = callback_data->type_creation_map.find(database_type);
if (iter != callback_data->type_creation_map.end())
{
data_ptr = iter->second(database_name);
}
else
{
Logger::write("Warning: no creator function found for data type \"%s\", using \"string\" as fallback", database_type.c_str());
data_ptr = InterpretedStringRecordField::createRecord(database_name);
}
audit_record->fields[database_name] = data_ptr;
}
}
data_ptr->addOrUpdateValue(au);
}
} while (auparse_next_field(au) > 0);
}
// first add all missing fields, keep data empty
{
auto missing_fields = callback_data->all_fields_set;
for (auto iter = audit_record->fields.begin(); iter != audit_record->fields.end(); ++iter)
{
missing_fields.erase(iter->first);
}
for (auto iter = missing_fields.begin(); iter != missing_fields.end(); ++iter)
{
std::string type_name;
auto type_iter = callback_data->datatypes_map.find(*iter);
if (type_iter != callback_data->datatypes_map.end())
{
type_name = type_iter->second;
}
else
{
for (auto regex_type_iter = callback_data->datatype_regexps_map.begin(); regex_type_iter != callback_data->datatype_regexps_map.end(); ++regex_type_iter)
{
if (*iter == std::get<2>(*regex_type_iter))
{
type_name = std::get<1>(*regex_type_iter);
break;
}
}
}
if (type_name.empty() && (*iter == "unknown_field"))
{
type_name = "string_array";
}
if (!type_name.empty())
{
auto factory_iter = callback_data->type_creation_map.find(type_name);
if (factory_iter != callback_data->type_creation_map.end())
{
audit_record->fields[*iter] = factory_iter->second(*iter);
}
}
else
{
Logger::write("Couldn't find registered type name for record with name\"%s\"", iter->c_str());
continue;
}
}
}
if (not callback_data->data_directory.empty())
{
audit_record->filename = callback_data->data_directory + boost::filesystem::path::separator + generate_name_for_audit_record(*audit_record);
boost::property_tree::write_json(audit_record->filename, audit_record->toPtree());
}
if (callback_data->clickhouse_client && (!callback_data->table_name.empty()))
{
// send data directly
clickhouse::Block block;
std::map<std::string, clickhouse::ColumnRef > data;
initialize_data_block(data, callback_data->datatypes_map, callback_data->datatype_regexps_map, callback_data->type_creation_map);
generate_clickhouse_columns_from_audit_records(data, *audit_record);
add_audit_data_to_clickhouse_block(data, block);
callback_data->clickhouse_client->Insert(callback_data->table_name, block);
// Data written, remove it
if (not audit_record->filename.empty())
{
boost::filesystem::remove(audit_record->filename);
}
}
else
{
// send data to buffering thread and notify it if necessary
size_t current_count = 0;
{ // lock
std::lock_guard<std::mutex> audit_records_list_lock(g_audit_records_list_mutex);
g_audit_records_list.push_front(audit_record);
current_count = g_audit_records_list.size();
} //unlock
if ((callback_data->write_count_limit > 0) && (current_count % static_cast<size_t>(callback_data->write_count_limit) == 0))
{
if (write_thread_data_pipes[1] != -1)
{
write(write_thread_data_pipes[1], "1", 1);
}
}
}
}
}
catch (const std::exception &e)
{
Logger::write("Caught exception while processing audit record %zu/%zu: %s", record, (size_t) auparse_get_num_records(au), e.what());
}
catch (...)
{
Logger::write("Caught unknown exception while processing audit record %zu/%zu", record, (size_t) auparse_get_num_records(au));
}
if (auparse_get_num_records(au) > record)
{
++record;
auparse_next_record(au);
}
else
{
break;
}
}
}
void writer_thread_function(clickhouse::Client &client, const std::string &table_name, int timeout, int count_limit, const std::string &data_directory)
{
struct pollfd pollfds[2];
pollfds[0].fd = write_thread_control_pipes[0];
pollfds[1].fd = write_thread_data_pipes[0];
auto current_time = std::chrono::steady_clock::now();
auto last_time = current_time;
bool thread_keep_running = true;
while (thread_keep_running)
{
pollfds[0].events = POLLIN | POLLHUP;
pollfds[0].revents = 0;
pollfds[1].events = POLLIN | POLLHUP;
pollfds[1].revents = 0;
int res = poll(pollfds, sizeof(pollfds) / sizeof(pollfds[0]), timeout);
current_time = std::chrono::steady_clock::now();
bool write_data = false;
bool poll_timeout = false;
if (res < 0)
{
if (errno != EINTR)
{
// error occured, finish running
Logger::write("Poll returned error: %d", errno);
stop_running();
thread_keep_running = false;
}
}
else if (res == 0)
{
// timeout
write_data = true;
poll_timeout = true;
}
else if (pollfds[0].revents & POLLIN)
{
// input data in control pipe means shutdown request
thread_keep_running = false;
}
else if ((pollfds[0].revents & POLLHUP) || (pollfds[0].revents & POLLERR))
{
// pipe closed too early, an error
Logger::write("Read returned pollhup or pollerr");
stop_running();
thread_keep_running = false;
}
else if (pollfds[1].revents & POLLIN)
{
// data was received, and records count exceeds limit, then write it
{
char c;
read(pollfds[1].fd, &c, sizeof(c));
}
write_data = true;
}
else if ((pollfds[1].revents & POLLHUP) || (pollfds[1].revents & POLLERR))
{
// pipe closed too early, an error
Logger::write("Read returned pollhup or pollerr");
stop_running();
thread_keep_running = false;
}
if (write_data || (!thread_keep_running))
{
try
{
// write all saved up data and log about it
std::list<std::shared_ptr<AuditRecord> > local_audit_records_list;
{ // lock
std::lock_guard<std::mutex> audit_records_list_lock(g_audit_records_list_mutex);
local_audit_records_list = g_audit_records_list;
} // unlock
bool write_and_log = (poll_timeout
|| (!thread_keep_running)
|| ((count_limit > 0) && (local_audit_records_list.size() >= static_cast<size_t>(count_limit))));
if (write_and_log && (!local_audit_records_list.empty()))
{
Logger::write("Writer thread: preparing to write %zu elements", local_audit_records_list.size());
clickhouse::Block block;
std::map<std::string, clickhouse::ColumnRef> data;
initialize_data_block(data, get_datatypes_map(), get_datatype_regexps_map(), get_type_creation_map());
for (auto iter = local_audit_records_list.rbegin(); iter != local_audit_records_list.rend(); ++iter)
{
generate_clickhouse_columns_from_audit_records(data, **iter);
}
add_audit_data_to_clickhouse_block(data, block);
client.Insert(table_name, block);
if (not data_directory.empty())
{
for (auto iter = local_audit_records_list.rbegin(); iter != local_audit_records_list.rend(); ++iter)
{
try
{
boost::filesystem::remove((*iter)->filename);
}
catch (const std::exception &e)
{
Logger::write("Writer thread: caught exception while trying to remove data file \"%s\": %s", (*iter)->filename.c_str(), e.what());
}
catch (...)
{
Logger::write("Writer thread: caught unknown exception while trying to remove data file \"%s\"", (*iter)->filename.c_str());
}
}
}
// successfully wrote data, now remove all written data
{ // lock
std::lock_guard<std::mutex> audit_records_list_lock(g_audit_records_list_mutex);
g_audit_records_list.resize(g_audit_records_list.size() - local_audit_records_list.size());
} // unlock
}
if (write_and_log)
{
Logger::write("Wrote %zu records in last %zu seconds",
local_audit_records_list.size(),
static_cast<size_t>(std::chrono::duration_cast<std::chrono::seconds>(current_time - last_time).count()));
}
last_time = current_time;
}
catch (const std::exception &e)
{
Logger::write("Writer thread: caught exception: %s", e.what());
}
catch (...)
{
Logger::write("Writer thread: caught unknown exception");
}
last_time = current_time;
}
}
}
std::string construct_clickhouse_datatype_string(const std::string &name, const std::string &audit_type)
{
static const std::map<std::string, std::vector<std::string> > audit_table_map = {
{ "string", { " Nullable(String)" } },
{ "integer", { "_IntValue Nullable(Int64)", "_InterpretedValue LowCardinality(Nullable(String))" } },
{ "string_array", { "_Name Array(String)", "_Value Array(String)" } }
};
std::vector<std::string> clickhouse_types;
auto iter = audit_table_map.find(audit_type);
if (iter != audit_table_map.end())
{
clickhouse_types = iter->second;
}
else
{
// Fallback to string
Logger::write("Warning: unknown database type for record name \"%s\"", name.c_str());
clickhouse_types.push_back(" Nullable(String)");
}
std::stringstream result;
result << sanitize_column_name(name) + clickhouse_types[0];
for (size_t i = 1; i < clickhouse_types.size(); ++i)
{
result << ", " << sanitize_column_name(name) + clickhouse_types[i];
}
return result.str();
}
int main(int argc, char **argv)
{
if (argc != 2)
{
Logger::write("Error: USAGE: %s config", argv[0]);
return -1;
}
try
{
clickhouse::ClientOptions client_options;
std::string table_name = "AuditData";
std::string datatypes_filename;
std::string data_directory;
int write_timeout = -1;
int write_count_limit = -1;
bool use_writer_thread = false;
if (pipe(runpipes) < 0)
{
throw std::runtime_error("Failed to create pipes");
}
BOOST_SCOPE_EXIT(&runpipes)
{
close(runpipes[0]);
close(runpipes[1]);
runpipes[0] = -1;
runpipes[1] = -1;
} BOOST_SCOPE_EXIT_END;
{
struct sigaction sa;
sa.sa_flags = 0;
sigemptyset(&sa.sa_mask);
sa.sa_handler = term_handler;
if (sigaction(SIGTERM, &sa, NULL) < 0)
{
throw std::runtime_error("Failed to set sigterm handler");
}
}
if (pipe(write_thread_control_pipes) < 0)
{
throw std::runtime_error("Failed to create pipes");
}
BOOST_SCOPE_EXIT(&write_thread_control_pipes)
{
close(write_thread_control_pipes[0]);
close(write_thread_control_pipes[1]);
write_thread_control_pipes[0] = -1;
write_thread_control_pipes[1] = -1;
} BOOST_SCOPE_EXIT_END;
if (pipe(write_thread_data_pipes) < 0)
{
throw std::runtime_error("Failed to create pipes");
}
BOOST_SCOPE_EXIT(&write_thread_data_pipes)
{
close(write_thread_data_pipes[0]);
close(write_thread_data_pipes[1]);
write_thread_data_pipes[0] = -1;
write_thread_data_pipes[1] = -1;
} BOOST_SCOPE_EXIT_END;
/* First read config */
{
boost::property_tree::ptree clickhouse_config_tree;
std::ifstream stream(argv[1]);
boost::property_tree::read_ini(stream, clickhouse_config_tree);
auto general = clickhouse_config_tree.get_child_optional("General");
if (general)
{
optional_set(datatypes_filename, *general, "DatatypesDescriptionFile");
optional_set(write_timeout, *general, "WriteTimeout");
optional_set(write_count_limit, *general, "WriteCountLimit");
optional_set(data_directory, *general, "DataDirectory");
if (!data_directory.empty())
{
try
{
if (boost::filesystem::create_directory(boost::filesystem::path(data_directory)))
{
// if directory is created, set permissions
boost::filesystem::permissions(boost::filesystem::path(data_directory), boost::filesystem::owner_all);
}
}
catch (const std::exception &exc)
{
Logger::write("Caught exception while creating data directory: %s", exc.what());
}
catch (...)
{
Logger::write("Caught unknown exception while creating data directory");
}
}
write_timeout = (write_timeout > 0) ? (write_timeout * 1000) : -1;
}
auto client_connection = clickhouse_config_tree.get_child_optional("Connection");
if (client_connection)
{
optional_set(table_name, *client_connection, "TableName", &is_valid_table_name);
optional_set(client_options.host, *client_connection, "Hostname");
optional_set(client_options.port, *client_connection, "Port");
optional_set(client_options.default_database, *client_connection, "DefaultDatabase");
optional_set(client_options.user, *client_connection, "Username");
optional_set(client_options.password, *client_connection, "Password");
optional_set(client_options.ping_before_query, *client_connection, "PingBeforeQuery");
optional_set(client_options.send_retries, *client_connection, "SendRetries");
{
auto element = client_connection->get_child_optional("RetryTimeout");
if (element)
{
client_options.retry_timeout = std::chrono::seconds(element->get_value<int>());
}
}
{
auto element = client_connection->get_child_optional("CompressionMethod");
if (element)
{
const auto value = element->get_value<std::string>();
const std::map<std::string, clickhouse::CompressionMethod> compression_method_map =
{
{ "None", clickhouse::CompressionMethod::None },
{ "LZ4", clickhouse::CompressionMethod::LZ4 },
};
auto iter = compression_method_map.find(value);
if (iter != compression_method_map.end())
{
client_options.compression_method = iter->second;
}
else
{
client_options.compression_method = clickhouse::CompressionMethod::None;
}
}
}
}
auto logging = clickhouse_config_tree.get_child_optional("Logging");
if (logging)
{
std::string logfilename;
optional_set(logfilename, *logging, "Logfile");
if (!logfilename.empty())
{
Logger::open(logfilename.c_str());
}
}
}
Logger::initialize();
Logger::dump();
read_datatypes_map(datatypes_filename);
// If no limits are set, don't use special writer thread
use_writer_thread = ((write_timeout > 0) || (write_count_limit > 0));
/* Now connect to clickhouse */
clickhouse::Client client(client_options);
CallbackData callback_data;
callback_data.datatypes_map = get_datatypes_map();
callback_data.datatype_regexps_map = get_datatype_regexps_map();
callback_data.type_creation_map = get_type_creation_map();
if (!use_writer_thread)
{
callback_data.clickhouse_client = &client;
callback_data.table_name = table_name;
}
callback_data.write_timeout = write_timeout;
callback_data.write_count_limit = write_count_limit;
callback_data.data_directory = data_directory;
{
std::stringstream str;
str << "CREATE TABLE IF NOT EXISTS " << table_name << " (record_time DateTime, record_milli UInt64, record_serial UInt64, record_node String";
for (auto iter = callback_data.datatypes_map.begin(); iter != callback_data.datatypes_map.end(); ++iter)
{
str << ", " << construct_clickhouse_datatype_string(iter->first, iter->second);
callback_data.all_fields_set.insert(iter->first);
}
for (auto iter = callback_data.datatype_regexps_map.begin(); iter != callback_data.datatype_regexps_map.end(); ++iter)
{
str << ", " << construct_clickhouse_datatype_string(std::get<2>(*iter), std::get<1>(*iter));
callback_data.all_fields_set.insert(std::get<2>(*iter));
}
str << ", " << construct_clickhouse_datatype_string("unknown_field", "string_array");
callback_data.all_fields_set.insert("unknown_field");
str << ") ENGINE = MergeTree ORDER BY (record_time, record_milli, record_serial, record_node)";
client.Execute(str.str());
}
if (not data_directory.empty())
{
// read all previously saved data, and send it to writer thread or write it right here
// and remove written files
std::set<std::shared_ptr<AuditRecord>, SharedPointerLess<AuditRecord> > audit_records_set;
for (auto iter = boost::filesystem::directory_iterator(data_directory); iter != boost::filesystem::directory_iterator(); ++iter)
{
Logger::write("Reading saved file: %s", iter->path().c_str());
try
{
boost::property_tree::ptree file_data;
boost::property_tree::read_json(iter->path().native(), file_data);
auto audit_record = AuditRecord::fromPtree(file_data);
audit_record->filename = iter->path().native();
audit_records_set.insert(audit_record);
}
catch (const std::exception &exc)
{
Logger::write("Error while reading saved file \"%s\": %s", iter->path().c_str(), exc.what());
}
catch (...)
{
Logger::write("Unknown error while reading saved file \"%s\"", iter->path().c_str());
}
}
if (use_writer_thread)
{
// send data to buffering thread and notify it if necessary
size_t current_count = 0;
{ // lock
std::lock_guard<std::mutex> audit_records_list_lock(g_audit_records_list_mutex);
for (auto iter = audit_records_set.begin(); iter != audit_records_set.end(); ++iter)
{
g_audit_records_list.push_front(*iter);
}
} // unlock
if ((write_count_limit > 0) && (current_count >= static_cast<size_t>(write_count_limit)))
{
if (write_thread_data_pipes[1] != -1)
{
write(write_thread_data_pipes[1], "1", 1);
}
}
Logger::write("Sent all saved data to writer thread");
}
else
{
std::map<std::string, clickhouse::ColumnRef > data_map;
clickhouse::Block block;
initialize_data_block(data_map, get_datatypes_map(), get_datatype_regexps_map(), get_type_creation_map());
for (auto iter = audit_records_set.begin(); iter != audit_records_set.end(); ++iter)
{
generate_clickhouse_columns_from_audit_records(data_map, **iter);
}
add_audit_data_to_clickhouse_block(data_map, block);
client.Insert(table_name, block);
for (auto iter = audit_records_set.begin(); iter != audit_records_set.end(); ++iter)
{
try
{
boost::filesystem::remove((*iter)->filename);
}
catch (const std::exception &e)
{
Logger::write("Caught exception while trying to remove data file \"%s\": %s", (*iter)->filename.c_str(), e.what());
}
catch (...)
{
Logger::write("Caught unknown exception while trying to remove data file \"%s\"", (*iter)->filename.c_str());
}
}
}
}
std::unique_ptr<std::thread, void(*)(std::thread*)> writer_thread(
use_writer_thread ? new std::thread(
writer_thread_function,
std::ref(client),
std::cref(table_name),
write_timeout,
write_count_limit,
std::cref(data_directory)
) : nullptr,
[] (std::thread *thread) {
if (thread)
{
if (thread->joinable())
{
if (write_thread_control_pipes[1] != -1)
{
write(write_thread_control_pipes[1], "1", 1);
}
thread->join();
}
delete thread;
}
});
std::unique_ptr<auparse_state_t, void(*)(auparse_state_t*)> au(auparse_init(AUSOURCE_FEED, 0), [](auparse_state_t *obj){
if (obj)
{
auparse_flush_feed(obj);
auparse_destroy(obj);
}
});
if (!au)
{
throw std::runtime_error("Failed to initialize audit");
}
auparse_add_callback(au.get(), auparse_callback, &callback_data, NULL);
char data[MAX_AUDIT_MESSAGE_LENGTH];
struct pollfd pollfds[2];
pollfds[0].fd = STDIN_FILENO;
pollfds[1].fd = runpipes[0];
while (running)
{
pollfds[0].events = POLLIN | POLLHUP;
pollfds[0].revents = 0;
pollfds[1].events = POLLIN | POLLHUP;
pollfds[1].revents = 0;
int res = poll(pollfds, sizeof(pollfds) / sizeof(pollfds[0]), -1);
if (res < 0)
{
if (errno != EINTR)
{
// error occured, finish running
Logger::write("Poll returned error: %d", errno);
running = false;
}
}
else if (res == 0)
{
// timeout, do nothing
}
else if (pollfds[0].revents & POLLIN)
{
ssize_t readsize = read(STDIN_FILENO, data, sizeof(data));
if (readsize > 0)
{
auparse_feed(au.get(), data, readsize);
// assume that data wasn't split, and flush it
// otherwise, libauparse keeps increasing it's memory footprint
auparse_flush_feed(au.get());
}
else
{
// error occured, finish running
Logger::write("Read returned error: %d", errno);
running = false;
}
}
else if ((pollfds[0].revents & POLLHUP) || (pollfds[0].revents & POLLERR))
{
// stdin closed, no more data, finish running
running = false;
}
}
}
catch (const std::exception &e)
{
Logger::write("Caught exception: %s", e.what());
return -1;
}
catch (...)
{
Logger::write("Caught unknown exception");
return -1;
}
return 0;
}