Improve audit callback: start creating audit record to be added to DB later
This commit is contained in:
parent
3c09000edc
commit
4de04f70b9
@ -28,6 +28,7 @@
|
||||
#include <map>
|
||||
#include <fstream>
|
||||
#include <vector>
|
||||
#include <set>
|
||||
|
||||
#include <auparse.h>
|
||||
#include <libaudit.h>
|
||||
@ -44,6 +45,21 @@
|
||||
int runpipes[2] = { -1, -1 };
|
||||
volatile bool running = true;
|
||||
|
||||
struct CallbackData
|
||||
{
|
||||
std::map<std::string, std::string> datatypes_map;
|
||||
std::list<std::tuple<std::string, std::string, std::string> > datatype_regexps_map;
|
||||
std::map<std::string, std::function<std::shared_ptr<AbstractRecordField>(const std::string &name)> > type_creation_map;
|
||||
std::set<std::string> all_fields_set;
|
||||
|
||||
clickhouse::Client *clickhouse_client;
|
||||
|
||||
CallbackData()
|
||||
: clickhouse_client(nullptr)
|
||||
{
|
||||
}
|
||||
};
|
||||
|
||||
static void term_handler(int sig)
|
||||
{
|
||||
if (runpipes[1] != -1)
|
||||
@ -90,7 +106,9 @@ void optional_set(T &value, const boost::property_tree::ptree &tree, const char
|
||||
|
||||
void auparse_callback(auparse_state_t *au, auparse_cb_event_t cb_event_type, void *user_data)
|
||||
{
|
||||
if (auparse_first_record(au) <= 0)
|
||||
CallbackData *callback_data = reinterpret_cast<CallbackData*>(user_data);
|
||||
|
||||
if ((!callback_data) || (auparse_first_record(au) <= 0))
|
||||
{
|
||||
return;
|
||||
}
|
||||
@ -101,22 +119,31 @@ void auparse_callback(auparse_state_t *au, auparse_cb_event_t cb_event_type, voi
|
||||
{
|
||||
if (cb_event_type == AUPARSE_CB_EVENT_READY)
|
||||
{
|
||||
auto ltime = auparse_get_time(au);
|
||||
auto milli = auparse_get_milli(au);
|
||||
auto serial = auparse_get_serial(au);
|
||||
auto node = auparse_get_node(au);
|
||||
AuditRecord audit_record;
|
||||
|
||||
audit_record.time = auparse_get_time(au);
|
||||
audit_record.milliseconds = auparse_get_milli(au);
|
||||
audit_record.serial = auparse_get_serial(au);
|
||||
audit_record.node = auparse_get_node(au);
|
||||
|
||||
if (auparse_first_field(au) > 0)
|
||||
{
|
||||
do
|
||||
{
|
||||
auparse_get_field_name(au);
|
||||
auparse_get_field_type(au);
|
||||
auparse_get_field_str(au);
|
||||
auparse_get_field_int(au);
|
||||
auparse_interpret_field(au);
|
||||
std::string field_name = auparse_get_field_name(au);
|
||||
|
||||
if (field_name != "node") // skip node since it's already processed
|
||||
{
|
||||
auparse_get_field_name(au);
|
||||
auparse_get_field_type(au);
|
||||
auparse_get_field_str(au);
|
||||
auparse_get_field_int(au);
|
||||
auparse_interpret_field(au);
|
||||
}
|
||||
} while (auparse_next_field(au) > 0);
|
||||
}
|
||||
|
||||
// TODO: add audit_record to clickhouse database
|
||||
}
|
||||
|
||||
if (auparse_get_num_records(au) > record)
|
||||
@ -259,25 +286,29 @@ int main(int argc, char **argv)
|
||||
|
||||
read_datatypes_map(datatypes_filename);
|
||||
|
||||
auto datatypes_map = get_datatypes_map();
|
||||
auto datatype_regexps_map = get_datatype_regexps_map();
|
||||
auto type_creation_map = get_type_creation_map();
|
||||
|
||||
/* Now connect to clickhouse */
|
||||
clickhouse::Client client(client_options);
|
||||
|
||||
CallbackData callback_data;
|
||||
callback_data.datatypes_map = get_datatypes_map();
|
||||
callback_data.datatype_regexps_map = get_datatype_regexps_map();
|
||||
callback_data.type_creation_map = get_type_creation_map();
|
||||
callback_data.clickhouse_client = &client;
|
||||
|
||||
{
|
||||
std::stringstream str;
|
||||
str << "CREATE TABLE IF NOT EXISTS " << table_name << " (record_time DateTime, record_milli UInt64, record_serial UInt64, record_node String";
|
||||
|
||||
for (auto iter = datatypes_map.begin(); iter != datatypes_map.end(); ++iter)
|
||||
for (auto iter = callback_data.datatypes_map.begin(); iter != callback_data.datatypes_map.end(); ++iter)
|
||||
{
|
||||
str << ", " << construct_clickhouse_datatype_string(iter->first, iter->second);
|
||||
callback_data.all_fields_set.insert(iter->first);
|
||||
}
|
||||
|
||||
for (auto iter = datatype_regexps_map.begin(); iter != datatype_regexps_map.end(); ++iter)
|
||||
for (auto iter = callback_data.datatype_regexps_map.begin(); iter != callback_data.datatype_regexps_map.end(); ++iter)
|
||||
{
|
||||
str << ", " << construct_clickhouse_datatype_string(std::get<2>(*iter), std::get<1>(*iter));
|
||||
callback_data.all_fields_set.insert(std::get<2>(*iter));
|
||||
}
|
||||
|
||||
str << ") ENGINE = MergeTree ORDER BY (record_time, record_milli, record_serial, record_node)";
|
||||
@ -298,7 +329,7 @@ int main(int argc, char **argv)
|
||||
throw std::runtime_error("Failed to initialize audit");
|
||||
}
|
||||
|
||||
auparse_add_callback(au.get(), auparse_callback, NULL, NULL);
|
||||
auparse_add_callback(au.get(), auparse_callback, &callback_data, NULL);
|
||||
|
||||
std::vector<char> data;
|
||||
data.resize(buffer_size);
|
||||
|
Loading…
Reference in New Issue
Block a user