/* * auditd-plugin-clickhouse is an auditd plugin for sending auditd data * to clickhouse DB. * Copyright (C) 2019-2020 Aleksei Nikiforov * * This program is free software: you can redistribute it and/or modify * it under the terms of the GNU General Public License as published by * the Free Software Foundation, either version 3 of the License, or * (at your option) any later version. * * This program is distributed in the hope that it will be useful, * but WITHOUT ANY WARRANTY; without even the implied warranty of * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the * GNU General Public License for more details. * * You should have received a copy of the GNU General Public License * along with this program. If not, see . * */ #include #include #include #include #include #include #include #include #include #include #include #include #include #include #include #include #include #include #include #include #include #include #include #include #include #include #include #include #include "auditd-datatypes.hpp" #include "logging.hpp" #include "utils.hpp" int runpipes[2] = { -1, -1 }; volatile bool running = true; volatile bool got_signal = false; int write_thread_control_pipes[2] = { -1, -1 }; int write_thread_data_pipes[2] = { -1, -1 }; std::mutex g_audit_records_list_mutex; std::list > g_audit_records_list; template struct SharedPointerLess: std::binary_function, std::shared_ptr, bool> { constexpr bool operator()(const std::shared_ptr &lhs, const std::shared_ptr &rhs) const { return (lhs && rhs) ? (*lhs < *rhs) : (lhs.get() < rhs.get()); } }; struct CallbackData { std::map datatypes_map; std::list > datatype_regexps_map; std::map()> > type_creation_map; std::set all_fields_set; clickhouse::Client *clickhouse_client; std::string table_name; int write_timeout; int write_count_limit; std::string data_directory; CallbackData() : clickhouse_client(nullptr), write_timeout(-1), write_count_limit(-1) { } }; static void stop_running() { running = false; if (runpipes[1] != -1) { write(runpipes[1], "1", 1); } } static void term_handler(int sig) { got_signal = true; stop_running(); } void initialize_data_block( std::map &data, std::map &data_present_map, std::map > &empty_data_map, const std::map &datatypes_map, const std::list > &datatype_regexps_map, const std::map()> > &generators) { data["record_time"] = std::make_shared(); data["record_milli"] = std::make_shared(); data["record_serial"] = std::make_shared(); data["record_node"] = std::make_shared(); for (auto iter = datatypes_map.begin(); iter != datatypes_map.end(); ++iter) { auto factory_iter = generators.find(iter->second); if (factory_iter != generators.end()) { auto columns = factory_iter->second()->generateColumnsAndNames(sanitize_column_name(iter->first)); for (auto column_iter = columns.begin(); column_iter != columns.end(); ++column_iter) { data[column_iter->name] = column_iter->value; } data_present_map[iter->first] = false; auto empty_record = factory_iter->second(); auto empty_columns = empty_record->generateColumnsAndNames(sanitize_column_name(iter->first)); empty_record->addToColumn(empty_columns); empty_data_map[sanitize_column_name(iter->first)] = empty_columns; } else { Logger::write("Couldn't find registered type name for record with type\"%s\"", iter->second.c_str()); } } for (auto iter = datatype_regexps_map.begin(); iter != datatype_regexps_map.end(); ++iter) { auto factory_iter = generators.find(std::get<1>(*iter)); if (factory_iter != generators.end()) { auto columns = factory_iter->second()->generateColumnsAndNames(sanitize_column_name(std::get<2>(*iter))); for (auto column_iter = columns.begin(); column_iter != columns.end(); ++column_iter) { data[column_iter->name] = column_iter->value; } data_present_map[std::get<2>(*iter)] = false; auto empty_record = factory_iter->second(); auto empty_columns = empty_record->generateColumnsAndNames(sanitize_column_name(std::get<2>(*iter))); empty_record->addToColumn(empty_columns); empty_data_map[sanitize_column_name(std::get<2>(*iter))] = empty_columns; } else { Logger::write("Couldn't find registered type name for record with type\"%s\"", std::get<2>(*iter).c_str()); } } // also add "unknown_field" { auto columns = InterpretedStringArrayRecordField::createRecord()->generateColumnsAndNames(sanitize_column_name("unknown_field")); for (auto column_iter = columns.begin(); column_iter != columns.end(); ++column_iter) { data[column_iter->name] = column_iter->value; } data_present_map["unknown_field"] = false; auto empty_record = InterpretedStringArrayRecordField::createRecord(); auto empty_columns = empty_record->generateColumnsAndNames(sanitize_column_name("unknown_field")); empty_record->addToColumn(empty_columns); empty_data_map[sanitize_column_name("unknown_field")] = empty_columns; } } void generate_clickhouse_columns_from_audit_records( std::map &data, std::map &data_present_map, const AuditRecord &record) { ensure_not_null(data["record_time"]->As())->Append(record.time); ensure_not_null(data["record_milli"]->As())->Append(record.milliseconds); ensure_not_null(data["record_serial"]->As())->Append(record.serial); ensure_not_null(data["record_node"]->As())->Append(record.node); for (auto iter = record.fields.begin(); iter != record.fields.end(); ++iter) { auto columns = iter->second->generateColumnsAndNames(sanitize_column_name(iter->first)); iter->second->addToColumn(columns); for (auto column_iter = columns.begin(); column_iter != columns.end(); ++column_iter) { data[column_iter->name]->Append(column_iter->value); } data_present_map[iter->first] = true; } } void fill_empty_columns( std::map &data, const std::map &data_present_map, const std::map > &empty_data_map) { for (auto iter = data_present_map.begin(); iter != data_present_map.end(); ++iter) { if (iter->second) { continue; } auto empty_data_iter = empty_data_map.find(sanitize_column_name(iter->first)); if (empty_data_iter != empty_data_map.end()) { for (auto empty_data_column_iter = empty_data_iter->second.begin(); empty_data_column_iter != empty_data_iter->second.end(); ++empty_data_column_iter) { data[empty_data_column_iter->name]->Append(empty_data_column_iter->value); } } } } void check_column_sizes_in_data(const std::map &data) { // now check that each column has same size auto iter = data.begin(); size_t count = iter->second->Size(); for (++iter ; iter != data.end(); ++iter) { if (count != iter->second->Size()) { std::stringstream errorstr; errorstr << "Columns size doesn't match, expected " << count << ", got " << iter->second->Size(); throw std::runtime_error(errorstr.str()); } } } void add_audit_data_to_clickhouse_block( const std::map &data, clickhouse::Block &block) { for (auto iter = data.begin(); iter != data.end(); ++iter) { block.AppendColumn(iter->first, iter->second); } } void auparse_callback(auparse_state_t *au, auparse_cb_event_t cb_event_type, void *user_data) { CallbackData *callback_data = reinterpret_cast(user_data); if ((!callback_data) || (auparse_first_record(au) <= 0)) { return; } size_t record = 1; for (;;) { try { if (cb_event_type == AUPARSE_CB_EVENT_READY) { std::shared_ptr audit_record = std::make_shared(); audit_record->time = auparse_get_time(au); audit_record->milliseconds = auparse_get_milli(au); audit_record->serial = auparse_get_serial(au); audit_record->node = string_or_null_and_free(auparse_get_node(au)); if (auparse_first_field(au) > 0) { do { const std::string field_name = string_or_null(auparse_get_field_name(au)); if (field_name != "node") // skip node since it's already processed { auparse_type_t field_type = static_cast(auparse_get_field_type(au)); std::string database_type; std::string database_name; // first search for this field name in datatypes map, // if it's not found there search all elements in datatypes regexp container { auto iter = callback_data->datatypes_map.find(field_name); if (iter != callback_data->datatypes_map.end()) { database_type = iter->second; database_name = iter->first; } else { for (auto regexp_iter = callback_data->datatype_regexps_map.begin(); regexp_iter != callback_data->datatype_regexps_map.end(); ++regexp_iter) { std::regex audit_name_regex(std::get<0>(*regexp_iter)); if (std::regex_match(field_name, audit_name_regex)) { database_type = std::get<1>(*regexp_iter); database_name = std::get<2>(*regexp_iter); break; } } } } if (database_type.empty() || database_name.empty()) { Logger::write("Couldn't find matching database entry for field with name \"%s\" and type \"%s\", putting it into \"unknown_field\" field", field_name.c_str(), field_type_to_string(field_type).c_str()); database_type = "string_array"; database_name = "unknown_field"; } else if (!check_field_type(field_type, database_type, field_name)) { Logger::write("Warning: expected datatype doesn't match database datatype for field \"%s\": expected \"%s\", actual \"%s\"", field_name.c_str(), database_type.c_str(), field_type_to_string(field_type).c_str()); } std::shared_ptr data_ptr; // If field is present in audit record, reuse it // and update it's value, // otherwise create new one and register it { auto data_iter = audit_record->fields.find(database_name); if (data_iter != audit_record->fields.end()) { data_ptr = data_iter->second; } else { auto iter = callback_data->type_creation_map.find(database_type); if (iter != callback_data->type_creation_map.end()) { data_ptr = iter->second(); } else { Logger::write("Warning: no creator function found for data type \"%s\", using \"string\" as fallback", database_type.c_str()); data_ptr = InterpretedStringRecordField::createRecord(); } audit_record->fields[database_name] = data_ptr; } } data_ptr->addOrUpdateValue(au); } } while (auparse_next_field(au) > 0); } if (not callback_data->data_directory.empty()) { audit_record->filename = callback_data->data_directory + boost::filesystem::path::separator + generate_name_for_audit_record(*audit_record); boost::property_tree::write_json(audit_record->filename, audit_record->toPtree()); } if (callback_data->clickhouse_client && (!callback_data->table_name.empty())) { // send data directly clickhouse::Block block; std::map data; std::map data_present_map; std::map > empty_data_map; initialize_data_block(data, data_present_map, empty_data_map, callback_data->datatypes_map, callback_data->datatype_regexps_map, callback_data->type_creation_map); generate_clickhouse_columns_from_audit_records(data, data_present_map, *audit_record); fill_empty_columns(data, data_present_map, empty_data_map); check_column_sizes_in_data(data); add_audit_data_to_clickhouse_block(data, block); callback_data->clickhouse_client->Insert(callback_data->table_name, block); // Data written, remove it if (not audit_record->filename.empty()) { boost::filesystem::remove(audit_record->filename); } } else { // send data to buffering thread and notify it if necessary size_t current_count = 0; { // lock std::lock_guard audit_records_list_lock(g_audit_records_list_mutex); g_audit_records_list.push_front(audit_record); current_count = g_audit_records_list.size(); } //unlock if ((callback_data->write_count_limit > 0) && (current_count % static_cast(callback_data->write_count_limit) == 0)) { if (write_thread_data_pipes[1] != -1) { write(write_thread_data_pipes[1], "1", 1); } } } } } catch (const std::exception &e) { Logger::write("Caught exception while processing audit record %zu/%zu: %s", record, (size_t) auparse_get_num_records(au), e.what()); } catch (...) { Logger::write("Caught unknown exception while processing audit record %zu/%zu", record, (size_t) auparse_get_num_records(au)); } if (auparse_get_num_records(au) > record) { ++record; auparse_next_record(au); } else { break; } } } void writer_thread_function(clickhouse::Client &client, const std::string &table_name, int timeout, int count_limit, const std::string &data_directory) { struct pollfd pollfds[2]; pollfds[0].fd = write_thread_control_pipes[0]; pollfds[1].fd = write_thread_data_pipes[0]; auto current_time = std::chrono::steady_clock::now(); auto last_time = current_time; bool thread_keep_running = true; while (thread_keep_running) { pollfds[0].events = POLLIN | POLLHUP; pollfds[0].revents = 0; pollfds[1].events = POLLIN | POLLHUP; pollfds[1].revents = 0; int res = poll(pollfds, sizeof(pollfds) / sizeof(pollfds[0]), timeout); current_time = std::chrono::steady_clock::now(); bool write_data = false; bool poll_timeout = false; if (res < 0) { if (errno != EINTR) { // error occured, finish running Logger::write("Poll returned error: %d", errno); stop_running(); thread_keep_running = false; } } else if (res == 0) { // timeout write_data = true; poll_timeout = true; } else if (pollfds[0].revents & POLLIN) { // input data in control pipe means shutdown request thread_keep_running = false; } else if ((pollfds[0].revents & POLLHUP) || (pollfds[0].revents & POLLERR)) { // pipe closed too early, an error Logger::write("Read returned pollhup or pollerr"); stop_running(); thread_keep_running = false; } else if (pollfds[1].revents & POLLIN) { // data was received, and records count exceeds limit, then write it { char c; read(pollfds[1].fd, &c, sizeof(c)); } write_data = true; } else if ((pollfds[1].revents & POLLHUP) || (pollfds[1].revents & POLLERR)) { // pipe closed too early, an error Logger::write("Read returned pollhup or pollerr"); stop_running(); thread_keep_running = false; } if (write_data || (!thread_keep_running)) { try { // write all saved up data and log about it std::list > local_audit_records_list; { // lock std::lock_guard audit_records_list_lock(g_audit_records_list_mutex); local_audit_records_list = g_audit_records_list; } // unlock bool write_and_log = (poll_timeout || (!thread_keep_running) || ((count_limit > 0) && (local_audit_records_list.size() >= static_cast(count_limit)))); if (write_and_log && (!local_audit_records_list.empty())) { Logger::write("Writer thread: preparing to write %zu elements", local_audit_records_list.size()); clickhouse::Block block; std::map data; std::map data_present_map; std::map > empty_data_map; initialize_data_block(data, data_present_map, empty_data_map, get_datatypes_map(), get_datatype_regexps_map(), get_type_creation_map()); for (auto iter = local_audit_records_list.rbegin(); iter != local_audit_records_list.rend(); ++iter) { generate_clickhouse_columns_from_audit_records(data, data_present_map, **iter); fill_empty_columns(data, data_present_map, empty_data_map); check_column_sizes_in_data(data); // reset presence indication for next iteration for (auto data_present_iter = data_present_map.begin(); data_present_iter != data_present_map.end(); ++data_present_iter) { data_present_iter->second = false; } } add_audit_data_to_clickhouse_block(data, block); client.Insert(table_name, block); if (not data_directory.empty()) { for (auto iter = local_audit_records_list.rbegin(); iter != local_audit_records_list.rend(); ++iter) { try { boost::filesystem::remove((*iter)->filename); } catch (const std::exception &e) { Logger::write("Writer thread: caught exception while trying to remove data file \"%s\": %s", (*iter)->filename.c_str(), e.what()); } catch (...) { Logger::write("Writer thread: caught unknown exception while trying to remove data file \"%s\"", (*iter)->filename.c_str()); } } } // successfully wrote data, now remove all written data { // lock std::lock_guard audit_records_list_lock(g_audit_records_list_mutex); g_audit_records_list.resize(g_audit_records_list.size() - local_audit_records_list.size()); } // unlock } if (write_and_log) { Logger::write("Wrote %zu records in last %zu seconds", local_audit_records_list.size(), static_cast(std::chrono::duration_cast(current_time - last_time).count())); } last_time = current_time; } catch (const std::exception &e) { Logger::write("Writer thread: caught exception: %s", e.what()); } catch (...) { Logger::write("Writer thread: caught unknown exception"); } last_time = current_time; } } } std::string construct_clickhouse_datatype_string(const std::string &name, const std::string &audit_type) { static const std::map > audit_table_map = { { "string", { " Nullable(String)" } }, { "integer", { "_IntValue Nullable(Int64)", "_InterpretedValue LowCardinality(Nullable(String))" } }, { "string_array", { "_Name Array(String)", "_Value Array(String)" } } }; std::vector clickhouse_types; auto iter = audit_table_map.find(audit_type); if (iter != audit_table_map.end()) { clickhouse_types = iter->second; } else { // Fallback to string Logger::write("Warning: unknown database type for record name \"%s\"", name.c_str()); clickhouse_types.push_back(" Nullable(String)"); } std::stringstream result; result << sanitize_column_name(name) + clickhouse_types[0]; for (size_t i = 1; i < clickhouse_types.size(); ++i) { result << ", " << sanitize_column_name(name) + clickhouse_types[i]; } return result.str(); } int main(int argc, char **argv) { if (argc != 2) { Logger::write("Error: USAGE: %s config", argv[0]); return -1; } try { Logger::write("Initializing"); clickhouse::ClientOptions client_options; std::string table_name = "AuditData"; std::string datatypes_filename; std::string data_directory; int write_timeout = -1; int write_count_limit = -1; bool use_writer_thread = false; if (pipe(runpipes) < 0) { throw std::runtime_error("Failed to create pipes"); } BOOST_SCOPE_EXIT(&runpipes) { close(runpipes[0]); close(runpipes[1]); runpipes[0] = -1; runpipes[1] = -1; } BOOST_SCOPE_EXIT_END; { struct sigaction sa; sa.sa_flags = 0; sigemptyset(&sa.sa_mask); sa.sa_handler = term_handler; if (sigaction(SIGTERM, &sa, NULL) < 0) { throw std::runtime_error("Failed to set sigterm handler"); } } if (pipe(write_thread_control_pipes) < 0) { throw std::runtime_error("Failed to create pipes"); } BOOST_SCOPE_EXIT(&write_thread_control_pipes) { close(write_thread_control_pipes[0]); close(write_thread_control_pipes[1]); write_thread_control_pipes[0] = -1; write_thread_control_pipes[1] = -1; } BOOST_SCOPE_EXIT_END; if (pipe(write_thread_data_pipes) < 0) { throw std::runtime_error("Failed to create pipes"); } BOOST_SCOPE_EXIT(&write_thread_data_pipes) { close(write_thread_data_pipes[0]); close(write_thread_data_pipes[1]); write_thread_data_pipes[0] = -1; write_thread_data_pipes[1] = -1; } BOOST_SCOPE_EXIT_END; /* First read config */ { boost::property_tree::ptree clickhouse_config_tree; std::ifstream stream(argv[1]); boost::property_tree::read_ini(stream, clickhouse_config_tree); auto general = clickhouse_config_tree.get_child_optional("General"); if (general) { optional_set(datatypes_filename, *general, "DatatypesDescriptionFile"); optional_set(write_timeout, *general, "WriteTimeout"); optional_set(write_count_limit, *general, "WriteCountLimit"); optional_set(data_directory, *general, "DataDirectory"); if (!data_directory.empty()) { try { if (boost::filesystem::create_directory(boost::filesystem::path(data_directory))) { // if directory is created, set permissions boost::filesystem::permissions(boost::filesystem::path(data_directory), boost::filesystem::owner_all); } } catch (const std::exception &exc) { Logger::write("Caught exception while creating data directory: %s", exc.what()); } catch (...) { Logger::write("Caught unknown exception while creating data directory"); } } write_timeout = (write_timeout > 0) ? (write_timeout * 1000) : -1; } auto client_connection = clickhouse_config_tree.get_child_optional("Connection"); if (client_connection) { optional_set(table_name, *client_connection, "TableName", &is_valid_table_name); optional_set(client_options.host, *client_connection, "Hostname"); optional_set(client_options.port, *client_connection, "Port"); optional_set(client_options.default_database, *client_connection, "DefaultDatabase"); optional_set(client_options.user, *client_connection, "Username"); optional_set(client_options.password, *client_connection, "Password"); optional_set(client_options.ping_before_query, *client_connection, "PingBeforeQuery"); optional_set(client_options.send_retries, *client_connection, "SendRetries"); { auto element = client_connection->get_child_optional("RetryTimeout"); if (element) { client_options.retry_timeout = std::chrono::seconds(element->get_value()); } } { auto element = client_connection->get_child_optional("CompressionMethod"); if (element) { const auto value = element->get_value(); const std::map compression_method_map = { { "None", clickhouse::CompressionMethod::None }, { "LZ4", clickhouse::CompressionMethod::LZ4 }, }; auto iter = compression_method_map.find(value); if (iter != compression_method_map.end()) { client_options.compression_method = iter->second; } else { client_options.compression_method = clickhouse::CompressionMethod::None; } } } } auto logging = clickhouse_config_tree.get_child_optional("Logging"); if (logging) { std::string logfilename; optional_set(logfilename, *logging, "Logfile"); if (!logfilename.empty()) { Logger::open(logfilename.c_str()); } } } Logger::initialize(); Logger::dump(); read_datatypes_map(datatypes_filename); // If no limits are set, don't use special writer thread use_writer_thread = ((write_timeout > 0) || (write_count_limit > 0)); /* Now connect to clickhouse */ clickhouse::Client client(client_options); CallbackData callback_data; callback_data.datatypes_map = get_datatypes_map(); callback_data.datatype_regexps_map = get_datatype_regexps_map(); callback_data.type_creation_map = get_type_creation_map(); if (!use_writer_thread) { callback_data.clickhouse_client = &client; callback_data.table_name = table_name; } callback_data.write_timeout = write_timeout; callback_data.write_count_limit = write_count_limit; callback_data.data_directory = data_directory; { std::stringstream str; str << "CREATE TABLE IF NOT EXISTS " << table_name << " (record_time DateTime, record_milli UInt64, record_serial UInt64, record_node String"; for (auto iter = callback_data.datatypes_map.begin(); iter != callback_data.datatypes_map.end(); ++iter) { str << ", " << construct_clickhouse_datatype_string(iter->first, iter->second); callback_data.all_fields_set.insert(iter->first); } for (auto iter = callback_data.datatype_regexps_map.begin(); iter != callback_data.datatype_regexps_map.end(); ++iter) { str << ", " << construct_clickhouse_datatype_string(std::get<2>(*iter), std::get<1>(*iter)); callback_data.all_fields_set.insert(std::get<2>(*iter)); } str << ", " << construct_clickhouse_datatype_string("unknown_field", "string_array"); callback_data.all_fields_set.insert("unknown_field"); str << ") ENGINE = MergeTree ORDER BY (record_time, record_milli, record_serial, record_node)"; client.Execute(str.str()); } if (not data_directory.empty()) { // read all previously saved data, and send it to writer thread or write it right here // and remove written files std::set, SharedPointerLess > audit_records_set; for (auto iter = boost::filesystem::directory_iterator(data_directory); iter != boost::filesystem::directory_iterator(); ++iter) { Logger::write("Reading saved file: %s", iter->path().c_str()); try { boost::property_tree::ptree file_data; boost::property_tree::read_json(iter->path().native(), file_data); auto audit_record = AuditRecord::fromPtree(file_data); audit_record->filename = iter->path().native(); audit_records_set.insert(audit_record); } catch (const std::exception &exc) { Logger::write("Error while reading saved file \"%s\": %s", iter->path().c_str(), exc.what()); } catch (...) { Logger::write("Unknown error while reading saved file \"%s\"", iter->path().c_str()); } } if (use_writer_thread) { // send data to buffering thread and notify it if necessary size_t current_count = 0; { // lock std::lock_guard audit_records_list_lock(g_audit_records_list_mutex); for (auto iter = audit_records_set.begin(); iter != audit_records_set.end(); ++iter) { g_audit_records_list.push_front(*iter); } } // unlock if ((write_count_limit > 0) && (current_count >= static_cast(write_count_limit))) { if (write_thread_data_pipes[1] != -1) { write(write_thread_data_pipes[1], "1", 1); } } Logger::write("Sent all saved data to writer thread"); } else { clickhouse::Block block; std::map data_map; std::map data_present_map; std::map > empty_data_map; initialize_data_block(data_map, data_present_map, empty_data_map, get_datatypes_map(), get_datatype_regexps_map(), get_type_creation_map()); for (auto iter = audit_records_set.begin(); iter != audit_records_set.end(); ++iter) { generate_clickhouse_columns_from_audit_records(data_map, data_present_map, **iter); fill_empty_columns(data_map, data_present_map, empty_data_map); check_column_sizes_in_data(data_map); // reset presence indication for next iteration for (auto data_present_iter = data_present_map.begin(); data_present_iter != data_present_map.end(); ++data_present_iter) { data_present_iter->second = false; } } add_audit_data_to_clickhouse_block(data_map, block); client.Insert(table_name, block); for (auto iter = audit_records_set.begin(); iter != audit_records_set.end(); ++iter) { try { boost::filesystem::remove((*iter)->filename); } catch (const std::exception &e) { Logger::write("Caught exception while trying to remove data file \"%s\": %s", (*iter)->filename.c_str(), e.what()); } catch (...) { Logger::write("Caught unknown exception while trying to remove data file \"%s\"", (*iter)->filename.c_str()); } } } Logger::write("Wrote all saved data to DB"); } Logger::write("Initializing data processing"); std::unique_ptr writer_thread( use_writer_thread ? new std::thread( writer_thread_function, std::ref(client), std::cref(table_name), write_timeout, write_count_limit, std::cref(data_directory) ) : nullptr, [] (std::thread *thread) { if (thread) { if (thread->joinable()) { if (write_thread_control_pipes[1] != -1) { write(write_thread_control_pipes[1], "1", 1); } thread->join(); } delete thread; } }); std::unique_ptr au(auparse_init(AUSOURCE_FEED, 0), [](auparse_state_t *obj){ if (obj) { auparse_flush_feed(obj); auparse_destroy(obj); } }); if (!au) { throw std::runtime_error("Failed to initialize audit"); } auparse_add_callback(au.get(), auparse_callback, &callback_data, NULL); char data[MAX_AUDIT_MESSAGE_LENGTH]; struct pollfd pollfds[2]; pollfds[0].fd = STDIN_FILENO; pollfds[1].fd = runpipes[0]; Logger::write("Initialization finished"); while (running) { pollfds[0].events = POLLIN | POLLHUP; pollfds[0].revents = 0; pollfds[1].events = POLLIN | POLLHUP; pollfds[1].revents = 0; int res = poll(pollfds, sizeof(pollfds) / sizeof(pollfds[0]), -1); if (res < 0) { if (errno != EINTR) { // error occured, finish running Logger::write("Poll returned error: %d", errno); running = false; } } else if (res == 0) { // timeout, do nothing } else if (pollfds[0].revents & POLLIN) { ssize_t readsize = read(STDIN_FILENO, data, sizeof(data)); if (readsize > 0) { auparse_feed(au.get(), data, readsize); // assume that data wasn't split, and flush it // otherwise, libauparse keeps increasing it's memory footprint auparse_flush_feed(au.get()); } else { // error occured, finish running Logger::write("Read returned error: %d", errno); running = false; } } else if ((pollfds[0].revents & POLLHUP) || (pollfds[0].revents & POLLERR)) { // stdin closed, no more data, finish running Logger::write("Stdin hangup or error, stopping"); running = false; } } Logger::write("Finishing running, signal received: %s", got_signal ? "true" : "false"); } catch (const std::exception &e) { Logger::write("Caught exception: %s", e.what()); return -1; } catch (...) { Logger::write("Caught unknown exception"); return -1; } try { Logger::write("Finished running"); } catch (...) { // ignore exceptions here } return 0; }