Implement clickhouse database creation

This commit is contained in:
Aleksei Nikiforov 2019-12-11 16:06:13 +03:00
parent fef1385176
commit d7ac138531
4 changed files with 80 additions and 36 deletions

View File

@ -12,3 +12,4 @@ Hostname=localhost
#SendRetries=1 #SendRetries=1
#RetryTimeout=5 #RetryTimeout=5
#CompressionMethod=None|LZ4 #CompressionMethod=None|LZ4
#TableName=AuditData

View File

@ -73,3 +73,18 @@ void read_datatypes_map(const std::string &config_filename)
} }
} }
} }
std::map<std::string, std::string> get_datatypes_map()
{
return s_datatypes_map;
}
std::list<std::tuple<std::string, std::string, std::string> > get_datatype_regexps_map()
{
return s_datatype_regexps_map;
}
std::map<std::string, std::function<std::shared_ptr<AbstractRecordField>(const std::string &name, auparse_state_t *record)> > get_type_creation_map()
{
return s_type_creation_map;
}

View File

@ -21,8 +21,17 @@
#ifndef AUDITD_PLUGIN_CLICKHOUSE_DATATYPES_HPP #ifndef AUDITD_PLUGIN_CLICKHOUSE_DATATYPES_HPP
#define AUDITD_PLUGIN_CLICKHOUSE_DATATYPES_HPP #define AUDITD_PLUGIN_CLICKHOUSE_DATATYPES_HPP
#include <string>
#include <map>
#include <list>
#include <functional>
#include <memory>
#include "auditd-record.hpp" #include "auditd-record.hpp"
void read_datatypes_map(const std::string &config_filename); void read_datatypes_map(const std::string &config_filename);
std::map<std::string, std::string> get_datatypes_map();
std::list<std::tuple<std::string, std::string, std::string> > get_datatype_regexps_map();
std::map<std::string, std::function<std::shared_ptr<AbstractRecordField>(const std::string &name, auparse_state_t *record)> > get_type_creation_map();
#endif /* AUDITD_PLUGIN_CLICKHOUSE_DATATYPES_HPP */ #endif /* AUDITD_PLUGIN_CLICKHOUSE_DATATYPES_HPP */

View File

@ -39,6 +39,8 @@
#include <boost/property_tree/ini_parser.hpp> #include <boost/property_tree/ini_parser.hpp>
#include <boost/optional.hpp> #include <boost/optional.hpp>
#include "auditd-datatypes.hpp"
int runpipes[2] = { -1, -1 }; int runpipes[2] = { -1, -1 };
volatile bool running = true; volatile bool running = true;
@ -88,10 +90,6 @@ void optional_set(T &value, const boost::property_tree::ptree &tree, const char
void auparse_callback(auparse_state_t *au, auparse_cb_event_t cb_event_type, void *user_data) void auparse_callback(auparse_state_t *au, auparse_cb_event_t cb_event_type, void *user_data)
{ {
FILE *f = fopen("/tmp/audit.log", "at");
if (f)
fprintf(f, "auparse_callback called\n");
if (auparse_first_record(au) <= 0) if (auparse_first_record(au) <= 0)
{ {
return; return;
@ -103,21 +101,10 @@ void auparse_callback(auparse_state_t *au, auparse_cb_event_t cb_event_type, voi
{ {
if (cb_event_type == AUPARSE_CB_EVENT_READY) if (cb_event_type == AUPARSE_CB_EVENT_READY)
{ {
boost::optional<int> type; auto ltime = auparse_get_time(au);
auto milli = auparse_get_milli(au);
time_t ltime = auparse_get_time(au); auto serial = auparse_get_serial(au);
struct tm *date_time = gmtime(&ltime); auto node = auparse_get_node(au);
char *tm1 = ctime(&ltime);
auparse_get_serial(au);
auparse_get_node(au);
if (f)
fprintf(f, "Processing record %zu/%u: node %s, serial %lu\n",
record,
auparse_get_num_records(au),
auparse_get_node(au),
auparse_get_serial(au));
if (auparse_first_field(au) > 0) if (auparse_first_field(au) > 0)
{ {
@ -128,14 +115,6 @@ void auparse_callback(auparse_state_t *au, auparse_cb_event_t cb_event_type, voi
auparse_get_field_str(au); auparse_get_field_str(au);
auparse_get_field_int(au); auparse_get_field_int(au);
auparse_interpret_field(au); auparse_interpret_field(au);
if (f)
fprintf(f, "Processing field: %s (type %d) = %s / %d / %s\n",
auparse_get_field_name(au),
auparse_get_field_type(au),
auparse_get_field_str(au),
auparse_get_field_int(au),
auparse_interpret_field(au));
} while (auparse_next_field(au) > 0); } while (auparse_next_field(au) > 0);
} }
} }
@ -150,11 +129,29 @@ void auparse_callback(auparse_state_t *au, auparse_cb_event_t cb_event_type, voi
break; break;
} }
} }
}
if (f) std::string construct_clickhouse_datatype_string(const std::string &name, const std::string &audit_type)
{
static const std::map<std::string, std::string> audit_table_map = {
{ "string", "String" },
{ "integer", "Nested( IntValue UInt64, InterpretedValue LowCardinality(String) )" }
};
std::string clickhouse_type;
auto iter = audit_table_map.find(audit_type);
if (iter != audit_table_map.end())
{ {
fclose(f); clickhouse_type = iter->second;
} }
else
{
// Fallback to string
// TODO: issue warning here about unknown type
clickhouse_type = "String";
}
return name + " " + clickhouse_type;
} }
int main(int argc, char **argv) int main(int argc, char **argv)
@ -169,9 +166,9 @@ int main(int argc, char **argv)
{ {
clickhouse::ClientOptions client_options; clickhouse::ClientOptions client_options;
//std::string table_name_cve = "Cve"; std::string table_name = "AuditData";
//std::string table_name_absent_packages;
size_t buffer_size = 4096; size_t buffer_size = 4096;
std::string datatypes_filename;
if (pipe(runpipes) < 0) if (pipe(runpipes) < 0)
{ {
@ -199,7 +196,6 @@ int main(int argc, char **argv)
} }
} }
#if 0
/* First read config */ /* First read config */
{ {
boost::property_tree::ptree clickhouse_config_tree; boost::property_tree::ptree clickhouse_config_tree;
@ -211,14 +207,13 @@ int main(int argc, char **argv)
if (general) if (general)
{ {
optional_set(buffer_size, *general, "BufferSize"); optional_set(buffer_size, *general, "BufferSize");
optional_set(datatypes_filename, *general, "DatatypesDescriptionFile");
} }
auto client_connection = clickhouse_config_tree.get_child_optional("Connection"); auto client_connection = clickhouse_config_tree.get_child_optional("Connection");
if (client_connection) if (client_connection)
{ {
//optional_set(table_name_cve, *client_connection, "TableNameCve", &is_valid_table_name); optional_set(table_name, *client_connection, "TableName", &is_valid_table_name);
//optional_set(table_name_absent_packages, *client_connection, "TableNameAbsentPackages", &is_valid_table_name);
optional_set(client_options.host, *client_connection, "Hostname"); optional_set(client_options.host, *client_connection, "Hostname");
optional_set(client_options.port, *client_connection, "Port"); optional_set(client_options.port, *client_connection, "Port");
optional_set(client_options.default_database, *client_connection, "DefaultDatabase"); optional_set(client_options.default_database, *client_connection, "DefaultDatabase");
@ -261,9 +256,33 @@ int main(int argc, char **argv)
} }
} }
read_datatypes_map(datatypes_filename);
auto datatypes_map = get_datatypes_map();
auto datatype_regexps_map = get_datatype_regexps_map();
auto type_creation_map = get_type_creation_map();
/* Now connect to clickhouse */ /* Now connect to clickhouse */
clickhouse::Client client(client_options); clickhouse::Client client(client_options);
#endif
{
std::stringstream str;
str << "CREATE TABLE IF NOT EXISTS " << table_name << " (record_time DateTime, record_milli UInt64, record_serial UInt64, record_node String";
for (auto iter = datatypes_map.begin(); iter != datatypes_map.end(); ++iter)
{
str << ", " << construct_clickhouse_datatype_string(iter->first, iter->second);
}
for (auto iter = datatype_regexps_map.begin(); iter != datatype_regexps_map.end(); ++iter)
{
str << ", " << construct_clickhouse_datatype_string(std::get<2>(*iter), std::get<1>(*iter));
}
str << ") ENGINE = MergeTree ORDER BY (record_time, record_milli, record_serial, record_node)";
client.Execute(str.str());
}
std::unique_ptr<auparse_state_t, void(*)(auparse_state_t*)> au(auparse_init(AUSOURCE_FEED, 0), [](auparse_state_t *obj){ std::unique_ptr<auparse_state_t, void(*)(auparse_state_t*)> au(auparse_init(AUSOURCE_FEED, 0), [](auparse_state_t *obj){
if (obj) if (obj)