diff --git a/auditd-plugin-clickhouse.cpp b/auditd-plugin-clickhouse.cpp index ddbe037..e972a3d 100644 --- a/auditd-plugin-clickhouse.cpp +++ b/auditd-plugin-clickhouse.cpp @@ -28,6 +28,7 @@ #include #include #include +#include #include #include @@ -44,6 +45,21 @@ int runpipes[2] = { -1, -1 }; volatile bool running = true; +struct CallbackData +{ + std::map datatypes_map; + std::list > datatype_regexps_map; + std::map(const std::string &name)> > type_creation_map; + std::set all_fields_set; + + clickhouse::Client *clickhouse_client; + + CallbackData() + : clickhouse_client(nullptr) + { + } +}; + static void term_handler(int sig) { if (runpipes[1] != -1) @@ -90,7 +106,9 @@ void optional_set(T &value, const boost::property_tree::ptree &tree, const char void auparse_callback(auparse_state_t *au, auparse_cb_event_t cb_event_type, void *user_data) { - if (auparse_first_record(au) <= 0) + CallbackData *callback_data = reinterpret_cast(user_data); + + if ((!callback_data) || (auparse_first_record(au) <= 0)) { return; } @@ -101,22 +119,31 @@ void auparse_callback(auparse_state_t *au, auparse_cb_event_t cb_event_type, voi { if (cb_event_type == AUPARSE_CB_EVENT_READY) { - auto ltime = auparse_get_time(au); - auto milli = auparse_get_milli(au); - auto serial = auparse_get_serial(au); - auto node = auparse_get_node(au); + AuditRecord audit_record; + + audit_record.time = auparse_get_time(au); + audit_record.milliseconds = auparse_get_milli(au); + audit_record.serial = auparse_get_serial(au); + audit_record.node = auparse_get_node(au); if (auparse_first_field(au) > 0) { do { - auparse_get_field_name(au); - auparse_get_field_type(au); - auparse_get_field_str(au); - auparse_get_field_int(au); - auparse_interpret_field(au); + std::string field_name = auparse_get_field_name(au); + + if (field_name != "node") // skip node since it's already processed + { + auparse_get_field_name(au); + auparse_get_field_type(au); + auparse_get_field_str(au); + auparse_get_field_int(au); + auparse_interpret_field(au); + } } while (auparse_next_field(au) > 0); } + + // TODO: add audit_record to clickhouse database } if (auparse_get_num_records(au) > record) @@ -259,25 +286,29 @@ int main(int argc, char **argv) read_datatypes_map(datatypes_filename); - auto datatypes_map = get_datatypes_map(); - auto datatype_regexps_map = get_datatype_regexps_map(); - auto type_creation_map = get_type_creation_map(); - /* Now connect to clickhouse */ clickhouse::Client client(client_options); + CallbackData callback_data; + callback_data.datatypes_map = get_datatypes_map(); + callback_data.datatype_regexps_map = get_datatype_regexps_map(); + callback_data.type_creation_map = get_type_creation_map(); + callback_data.clickhouse_client = &client; + { std::stringstream str; str << "CREATE TABLE IF NOT EXISTS " << table_name << " (record_time DateTime, record_milli UInt64, record_serial UInt64, record_node String"; - for (auto iter = datatypes_map.begin(); iter != datatypes_map.end(); ++iter) + for (auto iter = callback_data.datatypes_map.begin(); iter != callback_data.datatypes_map.end(); ++iter) { str << ", " << construct_clickhouse_datatype_string(iter->first, iter->second); + callback_data.all_fields_set.insert(iter->first); } - for (auto iter = datatype_regexps_map.begin(); iter != datatype_regexps_map.end(); ++iter) + for (auto iter = callback_data.datatype_regexps_map.begin(); iter != callback_data.datatype_regexps_map.end(); ++iter) { str << ", " << construct_clickhouse_datatype_string(std::get<2>(*iter), std::get<1>(*iter)); + callback_data.all_fields_set.insert(std::get<2>(*iter)); } str << ") ENGINE = MergeTree ORDER BY (record_time, record_milli, record_serial, record_node)"; @@ -298,7 +329,7 @@ int main(int argc, char **argv) throw std::runtime_error("Failed to initialize audit"); } - auparse_add_callback(au.get(), auparse_callback, NULL, NULL); + auparse_add_callback(au.get(), auparse_callback, &callback_data, NULL); std::vector data; data.resize(buffer_size);