auditd-plugin-clickhouse/auditd-plugin-clickhouse.cpp

559 lines
16 KiB
C++
Raw Normal View History

2019-11-11 16:55:42 +03:00
/*
* auditd-plugin-clickhouse is an auditd plugin for sending auditd data
* to clickhouse DB.
* Copyright (C) 2019 Aleksei Nikiforov <darktemplar@basealt.ru>
*
* This program is free software: you can redistribute it and/or modify
* it under the terms of the GNU General Public License as published by
* the Free Software Foundation, either version 3 of the License, or
* (at your option) any later version.
*
* This program is distributed in the hope that it will be useful,
* but WITHOUT ANY WARRANTY; without even the implied warranty of
* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
* GNU General Public License for more details.
*
* You should have received a copy of the GNU General Public License
* along with this program. If not, see <https://www.gnu.org/licenses/>.
*
*/
#include <signal.h>
#include <poll.h>
#include <unistd.h>
#include <errno.h>
#include <time.h>
#include <string>
#include <map>
#include <fstream>
#include <vector>
#include <set>
#include <auparse.h>
#include <libaudit.h>
2019-11-11 16:55:42 +03:00
#include <clickhouse-cpp/client.h>
#include <boost/scope_exit.hpp>
#include <boost/property_tree/ptree.hpp>
#include <boost/property_tree/ini_parser.hpp>
#include <boost/optional.hpp>
2019-12-12 16:12:44 +03:00
#include <regex>
2019-12-11 16:06:13 +03:00
#include "auditd-datatypes.hpp"
int runpipes[2] = { -1, -1 };
volatile bool running = true;
struct CallbackData
{
std::map<std::string, std::string> datatypes_map;
std::list<std::tuple<std::string, std::string, std::string> > datatype_regexps_map;
std::map<std::string, std::function<std::shared_ptr<AbstractRecordField>(const std::string &name)> > type_creation_map;
std::set<std::string> all_fields_set;
clickhouse::Client *clickhouse_client;
2019-12-16 16:09:45 +03:00
std::string table_name;
CallbackData()
: clickhouse_client(nullptr)
{
}
};
static void term_handler(int sig)
{
if (runpipes[1] != -1)
{
write(runpipes[1], "1", 1);
}
running = false;
}
bool is_valid_table_name(const std::string &value)
{
return (std::find_if_not(value.begin(), value.end(), [] (const char c) { return (std::isalnum(c) || (strchr("_", c) != NULL)); } ) == value.end());
}
template <typename T>
bool always_true(const T &)
{
return true;
}
template <typename T>
void optional_set(T &value, const boost::property_tree::ptree &tree, const char *element_name, bool (*check_function)(const T &))
{
auto element = tree.get_child_optional(element_name);
if (element)
{
if (check_function(element->get_value<T>()))
{
value = element->get_value<T>();
}
else
{
throw std::runtime_error(std::string("Invalid value for option '") + std::string(element_name) + std::string("'"));
}
}
}
template <typename T>
void optional_set(T &value, const boost::property_tree::ptree &tree, const char *element_name)
{
optional_set(value, tree, element_name, &always_true<T>);
}
2019-12-16 18:21:13 +03:00
std::string sanitize_column_name(const std::string &name)
{
auto result = name;
std::replace(result.begin(), result.end(), '-', '_');
return result;
}
void auparse_callback(auparse_state_t *au, auparse_cb_event_t cb_event_type, void *user_data)
{
CallbackData *callback_data = reinterpret_cast<CallbackData*>(user_data);
if ((!callback_data) || (auparse_first_record(au) <= 0))
{
return;
}
size_t record = 1;
for (;;)
{
try
{
if (cb_event_type == AUPARSE_CB_EVENT_READY)
{
AuditRecord audit_record;
audit_record.time = auparse_get_time(au);
audit_record.milliseconds = auparse_get_milli(au);
audit_record.serial = auparse_get_serial(au);
audit_record.node = auparse_get_node(au);
if (auparse_first_field(au) > 0)
{
do
{
std::string field_name = auparse_get_field_name(au);
2019-12-12 16:12:44 +03:00
if (field_name != "node") // skip node since it's already processed
2019-12-12 16:12:44 +03:00
{
const std::string field_name = auparse_get_field_name(au);
auparse_type_t field_type = static_cast<auparse_type_t>(auparse_get_field_type(au));
std::string database_type;
std::string database_name;
// first search for this field name in datatypes map,
// if it's not found there search all elements in datatypes regexp container
2019-12-12 16:12:44 +03:00
{
auto iter = callback_data->datatypes_map.find(field_name);
if (iter != callback_data->datatypes_map.end())
2019-12-12 16:12:44 +03:00
{
database_type = iter->second;
database_name = iter->first;
}
else
{
for (auto regexp_iter = callback_data->datatype_regexps_map.begin(); regexp_iter != callback_data->datatype_regexps_map.end(); ++regexp_iter)
2019-12-12 16:12:44 +03:00
{
std::regex audit_name_regex(std::get<0>(*regexp_iter));
if (std::regex_match(field_name, audit_name_regex))
{
database_type = std::get<1>(*regexp_iter);
database_name = std::get<2>(*regexp_iter);
break;
}
2019-12-12 16:12:44 +03:00
}
}
}
if (database_type.empty() || database_name.empty())
{
fprintf(stderr, "Couldn't find matching database entry for field with name \"%s\" and type %d\n", field_name.c_str(), (int) field_type);
continue;
}
2019-12-12 16:12:44 +03:00
if (!check_field_type(field_type, database_type, field_name))
2019-12-12 16:12:44 +03:00
{
fprintf(stderr, "Warning: expected datatype doesn't match database datatype for field \"%s\": expected \"%s\", actual %d\n",
field_name.c_str(), database_type.c_str(), field_type);
2019-12-12 16:12:44 +03:00
}
std::shared_ptr<AbstractRecordField> data_ptr;
// If field is present in audit record, reuse it
// and update it's value,
// otherwise create new one and register it
2019-12-12 16:12:44 +03:00
{
auto data_iter = audit_record.fields.find(database_name);
if (data_iter != audit_record.fields.end())
2019-12-12 16:12:44 +03:00
{
data_ptr = data_iter->second;
2019-12-12 16:12:44 +03:00
}
else
{
auto iter = callback_data->type_creation_map.find(database_type);
if (iter != callback_data->type_creation_map.end())
{
data_ptr = iter->second(database_name);
}
else
{
fprintf(stderr, "Warning: no creator function found for data type \"%s\", using \"string\" as fallback\n", database_type.c_str());
data_ptr = InterpretedStringRecordField::createRecord(database_name);
}
2019-12-12 16:12:44 +03:00
audit_record.fields[database_name] = data_ptr;
}
2019-12-12 16:12:44 +03:00
}
data_ptr->addOrUpdateValue(au);
}
} while (auparse_next_field(au) > 0);
2019-12-16 15:35:40 +03:00
}
// first add all missing fields, keep data empty
2019-12-16 15:35:40 +03:00
{
auto missing_fields = callback_data->all_fields_set;
2019-12-16 15:35:40 +03:00
for (auto iter = audit_record.fields.begin(); iter != audit_record.fields.end(); ++iter)
2019-12-16 15:35:40 +03:00
{
missing_fields.erase(iter->first);
2019-12-16 15:35:40 +03:00
}
for (auto iter = missing_fields.begin(); iter != missing_fields.end(); ++iter)
2019-12-16 15:35:40 +03:00
{
std::string type_name;
auto type_iter = callback_data->datatypes_map.find(*iter);
if (type_iter != callback_data->datatypes_map.end())
{
type_name = type_iter->second;
}
else
2019-12-16 15:35:40 +03:00
{
for (auto regex_type_iter = callback_data->datatype_regexps_map.begin(); regex_type_iter != callback_data->datatype_regexps_map.end(); ++regex_type_iter)
2019-12-16 15:35:40 +03:00
{
if (*iter == std::get<2>(*regex_type_iter))
{
type_name = std::get<1>(*regex_type_iter);
break;
}
2019-12-16 15:35:40 +03:00
}
}
if (!type_name.empty())
2019-12-16 15:35:40 +03:00
{
auto factory_iter = callback_data->type_creation_map.find(type_name);
if (factory_iter != callback_data->type_creation_map.end())
{
audit_record.fields[*iter] = factory_iter->second(*iter);
}
}
else
{
fprintf(stderr, "Couldn't find registered type name for record with name\"%s\"\n", iter->c_str());
continue;
2019-12-16 15:35:40 +03:00
}
}
}
2019-12-16 16:09:45 +03:00
clickhouse::Block block;
auto record_time_record = std::make_shared<clickhouse::ColumnDateTime>();
record_time_record->Append(audit_record.time);
block.AppendColumn("record_time", record_time_record);
auto record_milliseconds_record = std::make_shared<clickhouse::ColumnUInt64>();
record_milliseconds_record->Append(audit_record.milliseconds);
block.AppendColumn("record_milli", record_milliseconds_record);
auto record_serial_record = std::make_shared<clickhouse::ColumnUInt64>();
record_serial_record->Append(audit_record.serial);
block.AppendColumn("record_serial", record_serial_record);
auto record_node_record = std::make_shared<clickhouse::ColumnString>();
record_node_record->Append(audit_record.node);
block.AppendColumn("record_node", record_node_record);
for (auto iter = audit_record.fields.begin(); iter != audit_record.fields.end(); ++iter)
{
auto columns = iter->second->generateColumnsAndNames();
iter->second->addToColumn(columns);
for (auto column_iter = columns.begin(); column_iter != columns.end(); ++column_iter)
{
2019-12-16 18:21:13 +03:00
block.AppendColumn(sanitize_column_name(column_iter->name), column_iter->value);
2019-12-16 16:09:45 +03:00
}
}
callback_data->clickhouse_client->Insert(callback_data->table_name, block);
}
}
catch (const std::exception &e)
{
fprintf(stderr, "Caught exception while processing audit record %zu/%zu: %s\n", record, (size_t) auparse_get_num_records(au), e.what());
}
catch (...)
{
fprintf(stderr, "Caught unknown exception while processing audit record %zu/%zu\n", record, (size_t) auparse_get_num_records(au));
}
if (auparse_get_num_records(au) > record)
{
++record;
auparse_next_record(au);
}
else
{
break;
}
}
2019-12-11 16:06:13 +03:00
}
2019-12-11 16:06:13 +03:00
std::string construct_clickhouse_datatype_string(const std::string &name, const std::string &audit_type)
{
static const std::map<std::string, std::string> audit_table_map = {
{ "string", "Nullable(String)" },
{ "integer", "Nested( IntValue Nullable(Int64), InterpretedValue LowCardinality(Nullable(String)) )" },
{ "string_array", "Nested(Name Array(String), Value Array(String))" }
2019-12-11 16:06:13 +03:00
};
std::string clickhouse_type;
auto iter = audit_table_map.find(audit_type);
if (iter != audit_table_map.end())
{
clickhouse_type = iter->second;
}
else
{
2019-12-11 16:06:13 +03:00
// Fallback to string
fprintf(stderr, "Warning: unknown database type for record name \"%s\"\n", name.c_str());
clickhouse_type = "Nullable(String)";
}
2019-12-11 16:06:13 +03:00
2019-12-16 18:21:13 +03:00
return sanitize_column_name(name) + " " + clickhouse_type;
}
2019-11-11 16:55:42 +03:00
int main(int argc, char **argv)
{
if (argc != 2)
{
fprintf(stderr, "Error: USAGE: %s config\n", argv[0]);
return -1;
}
try
{
clickhouse::ClientOptions client_options;
2019-12-11 16:06:13 +03:00
std::string table_name = "AuditData";
size_t buffer_size = 4096;
2019-12-11 16:06:13 +03:00
std::string datatypes_filename;
if (pipe(runpipes) < 0)
{
throw std::runtime_error("Failed to create pipes");
}
BOOST_SCOPE_EXIT(&runpipes)
{
close(runpipes[0]);
close(runpipes[1]);
runpipes[0] = -1;
runpipes[1] = -1;
} BOOST_SCOPE_EXIT_END
{
struct sigaction sa;
sa.sa_flags = 0;
sigemptyset(&sa.sa_mask);
sa.sa_handler = term_handler;
if (sigaction(SIGTERM, &sa, NULL) < 0)
{
throw std::runtime_error("Failed to set sigterm handler");
}
}
/* First read config */
{
boost::property_tree::ptree clickhouse_config_tree;
std::ifstream stream(argv[1]);
boost::property_tree::read_ini(stream, clickhouse_config_tree);
auto general = clickhouse_config_tree.get_child_optional("General");
if (general)
{
optional_set(buffer_size, *general, "BufferSize");
2019-12-11 16:06:13 +03:00
optional_set(datatypes_filename, *general, "DatatypesDescriptionFile");
}
auto client_connection = clickhouse_config_tree.get_child_optional("Connection");
if (client_connection)
{
2019-12-11 16:06:13 +03:00
optional_set(table_name, *client_connection, "TableName", &is_valid_table_name);
optional_set(client_options.host, *client_connection, "Hostname");
optional_set(client_options.port, *client_connection, "Port");
optional_set(client_options.default_database, *client_connection, "DefaultDatabase");
optional_set(client_options.user, *client_connection, "Username");
optional_set(client_options.password, *client_connection, "Password");
optional_set(client_options.ping_before_query, *client_connection, "PingBeforeQuery");
optional_set(client_options.send_retries, *client_connection, "SendRetries");
{
auto element = client_connection->get_child_optional("RetryTimeout");
if (element)
{
client_options.retry_timeout = std::chrono::seconds(element->get_value<int>());
}
}
{
auto element = client_connection->get_child_optional("CompressionMethod");
if (element)
{
const auto value = element->get_value<std::string>();
const std::map<std::string, clickhouse::CompressionMethod> compression_method_map =
{
{ "None", clickhouse::CompressionMethod::None },
{ "LZ4", clickhouse::CompressionMethod::LZ4 },
};
auto iter = compression_method_map.find(value);
if (iter != compression_method_map.end())
{
client_options.compression_method = iter->second;
}
else
{
client_options.compression_method = clickhouse::CompressionMethod::None;
}
}
}
}
}
2019-12-11 16:06:13 +03:00
read_datatypes_map(datatypes_filename);
/* Now connect to clickhouse */
clickhouse::Client client(client_options);
2019-12-11 16:06:13 +03:00
CallbackData callback_data;
callback_data.datatypes_map = get_datatypes_map();
callback_data.datatype_regexps_map = get_datatype_regexps_map();
callback_data.type_creation_map = get_type_creation_map();
callback_data.clickhouse_client = &client;
2019-12-16 16:09:45 +03:00
callback_data.table_name = table_name;
2019-12-11 16:06:13 +03:00
{
std::stringstream str;
str << "CREATE TABLE IF NOT EXISTS " << table_name << " (record_time DateTime, record_milli UInt64, record_serial UInt64, record_node String";
for (auto iter = callback_data.datatypes_map.begin(); iter != callback_data.datatypes_map.end(); ++iter)
2019-12-11 16:06:13 +03:00
{
str << ", " << construct_clickhouse_datatype_string(iter->first, iter->second);
callback_data.all_fields_set.insert(iter->first);
2019-12-11 16:06:13 +03:00
}
for (auto iter = callback_data.datatype_regexps_map.begin(); iter != callback_data.datatype_regexps_map.end(); ++iter)
2019-12-11 16:06:13 +03:00
{
str << ", " << construct_clickhouse_datatype_string(std::get<2>(*iter), std::get<1>(*iter));
callback_data.all_fields_set.insert(std::get<2>(*iter));
2019-12-11 16:06:13 +03:00
}
str << ") ENGINE = MergeTree ORDER BY (record_time, record_milli, record_serial, record_node)";
client.Execute(str.str());
}
std::unique_ptr<auparse_state_t, void(*)(auparse_state_t*)> au(auparse_init(AUSOURCE_FEED, 0), [](auparse_state_t *obj){
if (obj)
{
auparse_flush_feed(obj);
auparse_destroy(obj);
}
});
if (!au)
{
throw std::runtime_error("Failed to initialize audit");
}
auparse_add_callback(au.get(), auparse_callback, &callback_data, NULL);
std::vector<char> data;
data.resize(buffer_size);
struct pollfd pollfds[2];
pollfds[0].fd = STDIN_FILENO;
pollfds[1].fd = runpipes[0];
while (running)
{
pollfds[0].events = POLLIN | POLLHUP;
pollfds[0].revents = 0;
pollfds[1].events = POLLIN | POLLHUP;
pollfds[1].revents = 0;
int res = poll(pollfds, sizeof(pollfds) / sizeof(pollfds[0]), -1);
if (res < 0)
{
// error occured, finish running
fprintf(stderr, "Poll returned error: %d\n", errno);
running = false;
}
else if (res == 0)
{
// timeout, do nothing
}
else if (pollfds[0].revents & POLLIN)
{
ssize_t readsize = read(STDIN_FILENO, data.data(), data.size());
if (readsize > 0)
{
auparse_feed(au.get(), data.data(), readsize);
}
else
{
// error occured, finish running
fprintf(stderr, "Read returned error: %d\n", errno);
running = false;
}
}
else if (pollfds[0].revents & POLLHUP)
{
// stdin closed, no more data, finish running
running = false;
}
}
}
catch (const std::exception &e)
{
fprintf(stderr, "Caught exception: %s\n", e.what());
return -1;
}
catch (...)
{
fprintf(stderr, "Caught unknown exception\n");
return -1;
}
2019-11-11 16:55:42 +03:00
return 0;
}