Move audit data convertion into separate functions

This commit is contained in:
Aleksei Nikiforov 2020-01-14 11:13:38 +03:00
parent 1895289a87
commit 36891f1273
2 changed files with 113 additions and 26 deletions

View File

@ -27,6 +27,7 @@
#include <string>
#include <map>
#include <fstream>
#include <sstream>
#include <vector>
#include <set>
@ -118,6 +119,102 @@ std::string sanitize_column_name(const std::string &name)
return result;
}
void initialize_data_block(
std::map<std::string, clickhouse::ColumnRef > &data,
const std::map<std::string, std::string> &datatypes_map,
const std::list<std::tuple<std::string, std::string, std::string> > &datatype_regexps_map,
const std::map<std::string, std::function<std::shared_ptr<AbstractRecordField>(const std::string &name)> > &generators)
{
data["record_time"] = std::make_shared<clickhouse::ColumnDateTime>();
data["record_milli"] = std::make_shared<clickhouse::ColumnUInt64>();
data["record_serial"] = std::make_shared<clickhouse::ColumnUInt64>();
data["record_node"] = std::make_shared<clickhouse::ColumnString>();
for (auto iter = datatypes_map.begin(); iter != datatypes_map.end(); ++iter)
{
auto factory_iter = generators.find(iter->second);
if (factory_iter != generators.end())
{
auto columns = factory_iter->second(sanitize_column_name(iter->first))->generateColumnsAndNames();
for (auto column_iter = columns.begin(); column_iter != columns.end(); ++column_iter)
{
data[column_iter->name] = column_iter->value;
}
}
else
{
Logger::write("Couldn't find registered type name for record with type\"%s\"\n", iter->second.c_str());
}
}
for (auto iter = datatype_regexps_map.begin(); iter != datatype_regexps_map.end(); ++iter)
{
auto factory_iter = generators.find(std::get<1>(*iter));
if (factory_iter != generators.end())
{
auto columns = factory_iter->second(sanitize_column_name(std::get<2>(*iter)))->generateColumnsAndNames();
for (auto column_iter = columns.begin(); column_iter != columns.end(); ++column_iter)
{
data[column_iter->name] = column_iter->value;
}
}
else
{
Logger::write("Couldn't find registered type name for record with type\"%s\"\n", std::get<2>(*iter).c_str());
}
}
}
void generate_clickhouse_columns_from_audit_records(
std::map<std::string, clickhouse::ColumnRef > &data,
const AuditRecord &record)
{
ensure_not_null(data["record_time"]->As<clickhouse::ColumnDateTime>())->Append(record.time);
ensure_not_null(data["record_milli"]->As<clickhouse::ColumnUInt64>())->Append(record.milliseconds);
ensure_not_null(data["record_serial"]->As<clickhouse::ColumnUInt64>())->Append(record.serial);
ensure_not_null(data["record_node"]->As<clickhouse::ColumnString>())->Append(record.node);
for (auto iter = record.fields.begin(); iter != record.fields.end(); ++iter)
{
auto columns = iter->second->generateColumnsAndNames();
iter->second->addToColumn(columns);
for (auto column_iter = columns.begin(); column_iter != columns.end(); ++column_iter)
{
data[sanitize_column_name(column_iter->name)]->Append(column_iter->value);
}
}
// now check that each column has same size
{
auto iter = data.begin();
size_t count = iter->second->Size();
for (++iter ; iter != data.end(); ++iter)
{
if (count != iter->second->Size())
{
std::stringstream errorstr;
errorstr << "Columns size doesn't match, expected " << count << ", got " << iter->second->Size();
throw std::runtime_error(errorstr.str());
}
}
}
}
void add_audit_data_to_clickhouse_block(
const std::map<std::string, clickhouse::ColumnRef > &data,
clickhouse::Block &block)
{
for (auto iter = data.begin(); iter != data.end(); ++iter)
{
block.AppendColumn(iter->first, iter->second);
}
}
void auparse_callback(auparse_state_t *au, auparse_cb_event_t cb_event_type, void *user_data)
{
CallbackData *callback_data = reinterpret_cast<CallbackData*>(user_data);
@ -271,33 +368,11 @@ void auparse_callback(auparse_state_t *au, auparse_cb_event_t cb_event_type, voi
}
clickhouse::Block block;
std::map<std::string, clickhouse::ColumnRef > data;
auto record_time_record = std::make_shared<clickhouse::ColumnDateTime>();
record_time_record->Append(audit_record.time);
block.AppendColumn("record_time", record_time_record);
auto record_milliseconds_record = std::make_shared<clickhouse::ColumnUInt64>();
record_milliseconds_record->Append(audit_record.milliseconds);
block.AppendColumn("record_milli", record_milliseconds_record);
auto record_serial_record = std::make_shared<clickhouse::ColumnUInt64>();
record_serial_record->Append(audit_record.serial);
block.AppendColumn("record_serial", record_serial_record);
auto record_node_record = std::make_shared<clickhouse::ColumnString>();
record_node_record->Append(audit_record.node);
block.AppendColumn("record_node", record_node_record);
for (auto iter = audit_record.fields.begin(); iter != audit_record.fields.end(); ++iter)
{
auto columns = iter->second->generateColumnsAndNames();
iter->second->addToColumn(columns);
for (auto column_iter = columns.begin(); column_iter != columns.end(); ++column_iter)
{
block.AppendColumn(sanitize_column_name(column_iter->name), column_iter->value);
}
}
initialize_data_block(data, callback_data->datatypes_map, callback_data->datatype_regexps_map, callback_data->type_creation_map);
generate_clickhouse_columns_from_audit_records(data, audit_record);
add_audit_data_to_clickhouse_block(data, block);
callback_data->clickhouse_client->Insert(callback_data->table_name, block);
}

View File

@ -22,7 +22,19 @@
#define AUDITD_PLUGIN_CLICKHOUSE_UTILS_HPP
#include <string>
#include <memory>
std::string string_or_null(const char *data);
template <typename T>
std::shared_ptr<T> ensure_not_null(const std::shared_ptr<T> &value)
{
if (!value)
{
throw std::runtime_error("Invalid pointer cast");
}
return value;
}
#endif /* AUDITD_PLUGIN_CLICKHOUSE_UTILS_HPP */