auditd-plugin-clickhouse/auditd-plugin-clickhouse.cpp
Aleksei Nikiforov 8e2910ab94 Add bufferization of data and separate writer thread
Add time and count limit and configuration options for them.
If no limits are set, separate thread is not used.
2020-01-27 13:07:59 +03:00

893 lines
25 KiB
C++

/*
* auditd-plugin-clickhouse is an auditd plugin for sending auditd data
* to clickhouse DB.
* Copyright (C) 2019-2020 Aleksei Nikiforov <darktemplar@basealt.ru>
*
* This program is free software: you can redistribute it and/or modify
* it under the terms of the GNU General Public License as published by
* the Free Software Foundation, either version 3 of the License, or
* (at your option) any later version.
*
* This program is distributed in the hope that it will be useful,
* but WITHOUT ANY WARRANTY; without even the implied warranty of
* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
* GNU General Public License for more details.
*
* You should have received a copy of the GNU General Public License
* along with this program. If not, see <https://www.gnu.org/licenses/>.
*
*/
#include <signal.h>
#include <poll.h>
#include <unistd.h>
#include <errno.h>
#include <time.h>
#include <algorithm>
#include <string>
#include <map>
#include <fstream>
#include <sstream>
#include <vector>
#include <set>
#include <thread>
#include <mutex>
#include <chrono>
#include <auparse.h>
#include <libaudit.h>
#include <clickhouse-cpp/client.h>
#include <boost/scope_exit.hpp>
#include <boost/property_tree/ptree.hpp>
#include <boost/property_tree/ini_parser.hpp>
#include <boost/optional.hpp>
#include <regex>
#include "auditd-datatypes.hpp"
#include "logging.hpp"
#include "utils.hpp"
int runpipes[2] = { -1, -1 };
volatile bool running = true;
int write_thread_control_pipes[2] = { -1, -1 };
int write_thread_data_pipes[2] = { -1, -1 };
std::mutex g_audit_records_list_mutex;
std::list<AuditRecord> g_audit_records_list;
struct CallbackData
{
std::map<std::string, std::string> datatypes_map;
std::list<std::tuple<std::string, std::string, std::string> > datatype_regexps_map;
std::map<std::string, std::function<std::shared_ptr<AbstractRecordField>(const std::string &name)> > type_creation_map;
std::set<std::string> all_fields_set;
clickhouse::Client *clickhouse_client;
std::string table_name;
int write_timeout;
int write_count_limit;
CallbackData()
: clickhouse_client(nullptr),
write_timeout(-1),
write_count_limit(-1)
{
}
};
static void stop_running()
{
running = false;
if (runpipes[1] != -1)
{
write(runpipes[1], "1", 1);
}
}
static void term_handler(int sig)
{
stop_running();
}
bool is_valid_table_name(const std::string &value)
{
return (std::find_if_not(value.begin(), value.end(), [] (const char c) { return (std::isalnum(c) || (strchr("_", c) != NULL)); } ) == value.end());
}
template <typename T>
bool always_true(const T &)
{
return true;
}
template <typename T>
void optional_set(T &value, const boost::property_tree::ptree &tree, const char *element_name, bool (*check_function)(const T &))
{
auto element = tree.get_child_optional(element_name);
if (element)
{
if (check_function(element->get_value<T>()))
{
value = element->get_value<T>();
}
else
{
throw std::runtime_error(std::string("Invalid value for option '") + std::string(element_name) + std::string("'"));
}
}
}
template <typename T>
void optional_set(T &value, const boost::property_tree::ptree &tree, const char *element_name)
{
optional_set(value, tree, element_name, &always_true<T>);
}
std::string sanitize_column_name(const std::string &name)
{
auto result = name;
std::replace(result.begin(), result.end(), '-', '_');
return result;
}
void initialize_data_block(
std::map<std::string, clickhouse::ColumnRef > &data,
const std::map<std::string, std::string> &datatypes_map,
const std::list<std::tuple<std::string, std::string, std::string> > &datatype_regexps_map,
const std::map<std::string, std::function<std::shared_ptr<AbstractRecordField>(const std::string &name)> > &generators)
{
data["record_time"] = std::make_shared<clickhouse::ColumnDateTime>();
data["record_milli"] = std::make_shared<clickhouse::ColumnUInt64>();
data["record_serial"] = std::make_shared<clickhouse::ColumnUInt64>();
data["record_node"] = std::make_shared<clickhouse::ColumnString>();
for (auto iter = datatypes_map.begin(); iter != datatypes_map.end(); ++iter)
{
auto factory_iter = generators.find(iter->second);
if (factory_iter != generators.end())
{
auto columns = factory_iter->second(sanitize_column_name(iter->first))->generateColumnsAndNames();
for (auto column_iter = columns.begin(); column_iter != columns.end(); ++column_iter)
{
data[column_iter->name] = column_iter->value;
}
}
else
{
Logger::write("Couldn't find registered type name for record with type\"%s\"\n", iter->second.c_str());
}
}
for (auto iter = datatype_regexps_map.begin(); iter != datatype_regexps_map.end(); ++iter)
{
auto factory_iter = generators.find(std::get<1>(*iter));
if (factory_iter != generators.end())
{
auto columns = factory_iter->second(sanitize_column_name(std::get<2>(*iter)))->generateColumnsAndNames();
for (auto column_iter = columns.begin(); column_iter != columns.end(); ++column_iter)
{
data[column_iter->name] = column_iter->value;
}
}
else
{
Logger::write("Couldn't find registered type name for record with type\"%s\"\n", std::get<2>(*iter).c_str());
}
}
}
void generate_clickhouse_columns_from_audit_records(
std::map<std::string, clickhouse::ColumnRef > &data,
const AuditRecord &record)
{
ensure_not_null(data["record_time"]->As<clickhouse::ColumnDateTime>())->Append(record.time);
ensure_not_null(data["record_milli"]->As<clickhouse::ColumnUInt64>())->Append(record.milliseconds);
ensure_not_null(data["record_serial"]->As<clickhouse::ColumnUInt64>())->Append(record.serial);
ensure_not_null(data["record_node"]->As<clickhouse::ColumnString>())->Append(record.node);
for (auto iter = record.fields.begin(); iter != record.fields.end(); ++iter)
{
auto columns = iter->second->generateColumnsAndNames();
iter->second->addToColumn(columns);
for (auto column_iter = columns.begin(); column_iter != columns.end(); ++column_iter)
{
data[sanitize_column_name(column_iter->name)]->Append(column_iter->value);
}
}
// now check that each column has same size
{
auto iter = data.begin();
size_t count = iter->second->Size();
for (++iter ; iter != data.end(); ++iter)
{
if (count != iter->second->Size())
{
std::stringstream errorstr;
errorstr << "Columns size doesn't match, expected " << count << ", got " << iter->second->Size();
throw std::runtime_error(errorstr.str());
}
}
}
}
void add_audit_data_to_clickhouse_block(
const std::map<std::string, clickhouse::ColumnRef > &data,
clickhouse::Block &block)
{
for (auto iter = data.begin(); iter != data.end(); ++iter)
{
block.AppendColumn(iter->first, iter->second);
}
}
void auparse_callback(auparse_state_t *au, auparse_cb_event_t cb_event_type, void *user_data)
{
CallbackData *callback_data = reinterpret_cast<CallbackData*>(user_data);
if ((!callback_data) || (auparse_first_record(au) <= 0))
{
return;
}
size_t record = 1;
for (;;)
{
try
{
if (cb_event_type == AUPARSE_CB_EVENT_READY)
{
AuditRecord audit_record;
audit_record.time = auparse_get_time(au);
audit_record.milliseconds = auparse_get_milli(au);
audit_record.serial = auparse_get_serial(au);
audit_record.node = string_or_null(auparse_get_node(au));
if (auparse_first_field(au) > 0)
{
do
{
const std::string field_name = string_or_null(auparse_get_field_name(au));
if (field_name != "node") // skip node since it's already processed
{
auparse_type_t field_type = static_cast<auparse_type_t>(auparse_get_field_type(au));
std::string database_type;
std::string database_name;
// first search for this field name in datatypes map,
// if it's not found there search all elements in datatypes regexp container
{
auto iter = callback_data->datatypes_map.find(field_name);
if (iter != callback_data->datatypes_map.end())
{
database_type = iter->second;
database_name = iter->first;
}
else
{
for (auto regexp_iter = callback_data->datatype_regexps_map.begin(); regexp_iter != callback_data->datatype_regexps_map.end(); ++regexp_iter)
{
std::regex audit_name_regex(std::get<0>(*regexp_iter));
if (std::regex_match(field_name, audit_name_regex))
{
database_type = std::get<1>(*regexp_iter);
database_name = std::get<2>(*regexp_iter);
break;
}
}
}
}
if (database_type.empty() || database_name.empty())
{
Logger::write("Couldn't find matching database entry for field with name \"%s\" and type \"%s\"\n", field_name.c_str(), field_type_to_string(field_type).c_str());
continue;
}
if (!check_field_type(field_type, database_type, field_name))
{
Logger::write("Warning: expected datatype doesn't match database datatype for field \"%s\": expected \"%s\", actual \"%s\"\n",
field_name.c_str(), database_type.c_str(), field_type_to_string(field_type).c_str());
}
std::shared_ptr<AbstractRecordField> data_ptr;
// If field is present in audit record, reuse it
// and update it's value,
// otherwise create new one and register it
{
auto data_iter = audit_record.fields.find(database_name);
if (data_iter != audit_record.fields.end())
{
data_ptr = data_iter->second;
}
else
{
auto iter = callback_data->type_creation_map.find(database_type);
if (iter != callback_data->type_creation_map.end())
{
data_ptr = iter->second(database_name);
}
else
{
Logger::write("Warning: no creator function found for data type \"%s\", using \"string\" as fallback\n", database_type.c_str());
data_ptr = InterpretedStringRecordField::createRecord(database_name);
}
audit_record.fields[database_name] = data_ptr;
}
}
data_ptr->addOrUpdateValue(au);
}
} while (auparse_next_field(au) > 0);
}
// first add all missing fields, keep data empty
{
auto missing_fields = callback_data->all_fields_set;
for (auto iter = audit_record.fields.begin(); iter != audit_record.fields.end(); ++iter)
{
missing_fields.erase(iter->first);
}
for (auto iter = missing_fields.begin(); iter != missing_fields.end(); ++iter)
{
std::string type_name;
auto type_iter = callback_data->datatypes_map.find(*iter);
if (type_iter != callback_data->datatypes_map.end())
{
type_name = type_iter->second;
}
else
{
for (auto regex_type_iter = callback_data->datatype_regexps_map.begin(); regex_type_iter != callback_data->datatype_regexps_map.end(); ++regex_type_iter)
{
if (*iter == std::get<2>(*regex_type_iter))
{
type_name = std::get<1>(*regex_type_iter);
break;
}
}
}
if (!type_name.empty())
{
auto factory_iter = callback_data->type_creation_map.find(type_name);
if (factory_iter != callback_data->type_creation_map.end())
{
audit_record.fields[*iter] = factory_iter->second(*iter);
}
}
else
{
Logger::write("Couldn't find registered type name for record with name\"%s\"\n", iter->c_str());
continue;
}
}
}
if (callback_data->clickhouse_client && (!callback_data->table_name.empty()))
{
// send data directly
clickhouse::Block block;
std::map<std::string, clickhouse::ColumnRef > data;
initialize_data_block(data, callback_data->datatypes_map, callback_data->datatype_regexps_map, callback_data->type_creation_map);
generate_clickhouse_columns_from_audit_records(data, audit_record);
add_audit_data_to_clickhouse_block(data, block);
callback_data->clickhouse_client->Insert(callback_data->table_name, block);
}
else
{
// send data to buffering thread and notify it if necessary
size_t current_count = 0;
{ // lock
std::lock_guard<std::mutex> audit_records_list_lock(g_audit_records_list_mutex);
g_audit_records_list.push_front(audit_record);
current_count = g_audit_records_list.size();
} //unlock
if ((callback_data->write_count_limit > 0) && (current_count >= static_cast<size_t>(callback_data->write_count_limit)))
{
if (write_thread_data_pipes[1] != -1)
{
write(write_thread_data_pipes[1], "1", 1);
}
}
}
}
}
catch (const std::exception &e)
{
Logger::write("Caught exception while processing audit record %zu/%zu: %s\n", record, (size_t) auparse_get_num_records(au), e.what());
}
catch (...)
{
Logger::write("Caught unknown exception while processing audit record %zu/%zu\n", record, (size_t) auparse_get_num_records(au));
}
if (auparse_get_num_records(au) > record)
{
++record;
auparse_next_record(au);
}
else
{
break;
}
}
}
void writer_thread_function(clickhouse::Client &client, const std::string &table_name, int timeout, int count_limit)
{
struct pollfd pollfds[2];
pollfds[0].fd = write_thread_control_pipes[0];
pollfds[1].fd = write_thread_data_pipes[0];
auto current_time = std::chrono::steady_clock::now();
auto last_time = current_time;
bool thread_keep_running = true;
while (thread_keep_running)
{
pollfds[0].events = POLLIN | POLLHUP;
pollfds[0].revents = 0;
pollfds[1].events = POLLIN | POLLHUP;
pollfds[1].revents = 0;
int res = poll(pollfds, sizeof(pollfds) / sizeof(pollfds[0]), timeout);
current_time = std::chrono::steady_clock::now();
bool write_data = false;
if (res < 0)
{
// error occured, finish running
Logger::write("Poll returned error: %d\n", errno);
stop_running();
thread_keep_running = false;
}
else if (res == 0)
{
// timeout
write_data = true;
}
else if (pollfds[0].revents & POLLIN)
{
// input data in control pipe means shutdown request
thread_keep_running = false;
}
else if ((pollfds[0].revents & POLLHUP) || (pollfds[0].revents & POLLERR))
{
// pipe closed too early, an error
Logger::write("Read returned error: %d\n", errno);
stop_running();
thread_keep_running = false;
}
else if (pollfds[1].revents & POLLIN)
{
// data was received, and records count exceeds limit, then write it
{
char c;
read(pollfds[1].fd, &c, sizeof(c));
}
write_data = true;
}
else if ((pollfds[1].revents & POLLHUP) || (pollfds[1].revents & POLLERR))
{
// pipe closed too early, an error
Logger::write("Read returned error: %d\n", errno);
stop_running();
thread_keep_running = false;
}
if (write_data || (!thread_keep_running))
{
try
{
// write all saved up data and log about it
std::list<AuditRecord> local_audit_records_list;
{ // lock
std::lock_guard<std::mutex> audit_records_list_lock(g_audit_records_list_mutex);
local_audit_records_list = g_audit_records_list;
} // unlock
if (!local_audit_records_list.empty())
{
Logger::write("Writer thread: preparing to write %zu elements\n", local_audit_records_list.size());
clickhouse::Block block;
std::map<std::string, clickhouse::ColumnRef> data;
initialize_data_block(data, get_datatypes_map(), get_datatype_regexps_map(), get_type_creation_map());
for (auto iter = local_audit_records_list.rbegin(); iter != local_audit_records_list.rend(); ++iter)
{
generate_clickhouse_columns_from_audit_records(data, *iter);
}
add_audit_data_to_clickhouse_block(data, block);
client.Insert(table_name, block);
// successfully wrote data, now remove all written data
{ // lock
std::lock_guard<std::mutex> audit_records_list_lock(g_audit_records_list_mutex);
g_audit_records_list.resize(g_audit_records_list.size() - local_audit_records_list.size());
} // unlock
}
Logger::write("Wrote %zu records in last %zu seconds\n",
local_audit_records_list.size(),
static_cast<size_t>(std::chrono::duration_cast<std::chrono::seconds>(current_time - last_time).count()));
last_time = current_time;
}
catch (const std::exception &e)
{
Logger::write("Writer thread: caught exception: %s\n", e.what());
}
catch (...)
{
Logger::write("Writer thread: caught unknown exception\n");
}
last_time = current_time;
}
}
}
std::string construct_clickhouse_datatype_string(const std::string &name, const std::string &audit_type)
{
static const std::map<std::string, std::vector<std::string> > audit_table_map = {
{ "string", { " Nullable(String)" } },
{ "integer", { "_IntValue Nullable(Int64)", "_InterpretedValue LowCardinality(Nullable(String))" } },
{ "string_array", { "_Name Array(String)", "_Value Array(String)" } }
};
std::vector<std::string> clickhouse_types;
auto iter = audit_table_map.find(audit_type);
if (iter != audit_table_map.end())
{
clickhouse_types = iter->second;
}
else
{
// Fallback to string
Logger::write("Warning: unknown database type for record name \"%s\"\n", name.c_str());
clickhouse_types.push_back(" Nullable(String)");
}
std::stringstream result;
result << sanitize_column_name(name) + clickhouse_types[0];
for (size_t i = 1; i < clickhouse_types.size(); ++i)
{
result << ", " << sanitize_column_name(name) + clickhouse_types[i];
}
return result.str();
}
int main(int argc, char **argv)
{
if (argc != 2)
{
Logger::write("Error: USAGE: %s config\n", argv[0]);
return -1;
}
try
{
clickhouse::ClientOptions client_options;
std::string table_name = "AuditData";
size_t buffer_size = 4096;
std::string datatypes_filename;
int write_timeout = -1;
int write_count_limit = -1;
bool use_writer_thread = false;
if (pipe(runpipes) < 0)
{
throw std::runtime_error("Failed to create pipes");
}
BOOST_SCOPE_EXIT(&runpipes)
{
close(runpipes[0]);
close(runpipes[1]);
runpipes[0] = -1;
runpipes[1] = -1;
} BOOST_SCOPE_EXIT_END;
{
struct sigaction sa;
sa.sa_flags = 0;
sigemptyset(&sa.sa_mask);
sa.sa_handler = term_handler;
if (sigaction(SIGTERM, &sa, NULL) < 0)
{
throw std::runtime_error("Failed to set sigterm handler");
}
}
if (pipe(write_thread_control_pipes) < 0)
{
throw std::runtime_error("Failed to create pipes");
}
BOOST_SCOPE_EXIT(&write_thread_control_pipes)
{
close(write_thread_control_pipes[0]);
close(write_thread_control_pipes[1]);
write_thread_control_pipes[0] = -1;
write_thread_control_pipes[1] = -1;
} BOOST_SCOPE_EXIT_END;
if (pipe(write_thread_data_pipes) < 0)
{
throw std::runtime_error("Failed to create pipes");
}
BOOST_SCOPE_EXIT(&write_thread_data_pipes)
{
close(write_thread_data_pipes[0]);
close(write_thread_data_pipes[1]);
write_thread_data_pipes[0] = -1;
write_thread_data_pipes[1] = -1;
} BOOST_SCOPE_EXIT_END;
/* First read config */
{
boost::property_tree::ptree clickhouse_config_tree;
std::ifstream stream(argv[1]);
boost::property_tree::read_ini(stream, clickhouse_config_tree);
auto general = clickhouse_config_tree.get_child_optional("General");
if (general)
{
optional_set(buffer_size, *general, "BufferSize");
optional_set(datatypes_filename, *general, "DatatypesDescriptionFile");
optional_set(write_timeout, *general, "WriteTimeout");
optional_set(write_count_limit, *general, "WriteCountLimit");
write_timeout = (write_timeout > 0) ? (write_timeout * 1000) : -1;
}
auto client_connection = clickhouse_config_tree.get_child_optional("Connection");
if (client_connection)
{
optional_set(table_name, *client_connection, "TableName", &is_valid_table_name);
optional_set(client_options.host, *client_connection, "Hostname");
optional_set(client_options.port, *client_connection, "Port");
optional_set(client_options.default_database, *client_connection, "DefaultDatabase");
optional_set(client_options.user, *client_connection, "Username");
optional_set(client_options.password, *client_connection, "Password");
optional_set(client_options.ping_before_query, *client_connection, "PingBeforeQuery");
optional_set(client_options.send_retries, *client_connection, "SendRetries");
{
auto element = client_connection->get_child_optional("RetryTimeout");
if (element)
{
client_options.retry_timeout = std::chrono::seconds(element->get_value<int>());
}
}
{
auto element = client_connection->get_child_optional("CompressionMethod");
if (element)
{
const auto value = element->get_value<std::string>();
const std::map<std::string, clickhouse::CompressionMethod> compression_method_map =
{
{ "None", clickhouse::CompressionMethod::None },
{ "LZ4", clickhouse::CompressionMethod::LZ4 },
};
auto iter = compression_method_map.find(value);
if (iter != compression_method_map.end())
{
client_options.compression_method = iter->second;
}
else
{
client_options.compression_method = clickhouse::CompressionMethod::None;
}
}
}
}
auto logging = clickhouse_config_tree.get_child_optional("Logging");
if (logging)
{
std::string logfilename;
optional_set(logfilename, *logging, "Logfile");
if (!logfilename.empty())
{
Logger::open(logfilename.c_str());
}
}
}
read_datatypes_map(datatypes_filename);
// If no limits are set, don't use special writer thread
use_writer_thread = ((write_timeout > 0) || (write_count_limit > 0));
/* Now connect to clickhouse */
clickhouse::Client client(client_options);
CallbackData callback_data;
callback_data.datatypes_map = get_datatypes_map();
callback_data.datatype_regexps_map = get_datatype_regexps_map();
callback_data.type_creation_map = get_type_creation_map();
if (!use_writer_thread)
{
callback_data.clickhouse_client = &client;
callback_data.table_name = table_name;
}
callback_data.write_timeout = write_timeout;
callback_data.write_count_limit = write_count_limit;
{
std::stringstream str;
str << "CREATE TABLE IF NOT EXISTS " << table_name << " (record_time DateTime, record_milli UInt64, record_serial UInt64, record_node String";
for (auto iter = callback_data.datatypes_map.begin(); iter != callback_data.datatypes_map.end(); ++iter)
{
str << ", " << construct_clickhouse_datatype_string(iter->first, iter->second);
callback_data.all_fields_set.insert(iter->first);
}
for (auto iter = callback_data.datatype_regexps_map.begin(); iter != callback_data.datatype_regexps_map.end(); ++iter)
{
str << ", " << construct_clickhouse_datatype_string(std::get<2>(*iter), std::get<1>(*iter));
callback_data.all_fields_set.insert(std::get<2>(*iter));
}
str << ") ENGINE = MergeTree ORDER BY (record_time, record_milli, record_serial, record_node)";
client.Execute(str.str());
}
std::unique_ptr<auparse_state_t, void(*)(auparse_state_t*)> au(auparse_init(AUSOURCE_FEED, 0), [](auparse_state_t *obj){
if (obj)
{
auparse_flush_feed(obj);
auparse_destroy(obj);
}
});
if (!au)
{
throw std::runtime_error("Failed to initialize audit");
}
std::unique_ptr<std::thread, void(*)(std::thread*)> writer_thread(
use_writer_thread ? new std::thread(writer_thread_function, std::ref(client), std::cref(table_name), write_timeout, write_count_limit) : nullptr,
[] (std::thread *thread) {
if (thread)
{
if (thread->joinable())
{
if (write_thread_control_pipes[1] != -1)
{
write(write_thread_control_pipes[1], "1", 1);
}
thread->join();
}
delete thread;
}
});
auparse_add_callback(au.get(), auparse_callback, &callback_data, NULL);
std::vector<char> data;
data.resize(buffer_size);
struct pollfd pollfds[2];
pollfds[0].fd = STDIN_FILENO;
pollfds[1].fd = runpipes[0];
while (running)
{
pollfds[0].events = POLLIN | POLLHUP;
pollfds[0].revents = 0;
pollfds[1].events = POLLIN | POLLHUP;
pollfds[1].revents = 0;
int res = poll(pollfds, sizeof(pollfds) / sizeof(pollfds[0]), -1);
if (res < 0)
{
// error occured, finish running
Logger::write("Poll returned error: %d\n", errno);
running = false;
}
else if (res == 0)
{
// timeout, do nothing
}
else if (pollfds[0].revents & POLLIN)
{
ssize_t readsize = read(STDIN_FILENO, data.data(), data.size());
if (readsize > 0)
{
auparse_feed(au.get(), data.data(), readsize);
}
else
{
// error occured, finish running
Logger::write("Read returned error: %d\n", errno);
running = false;
}
}
else if ((pollfds[0].revents & POLLHUP) || (pollfds[0].revents & POLLERR))
{
// stdin closed, no more data, finish running
running = false;
}
}
}
catch (const std::exception &e)
{
Logger::write("Caught exception: %s\n", e.what());
return -1;
}
catch (...)
{
Logger::write("Caught unknown exception\n");
return -1;
}
return 0;
}