Serialize and save data before sending it to write thread
This commit is contained in:
parent
be7e252971
commit
a42ec11de1
@ -11,7 +11,7 @@ set(BIN_INSTALL_LIBEXEC ${CMAKE_INSTALL_PREFIX}/libexec CACHE PATH "Installation
|
||||
set(THREADS_PREFER_PTHREAD_FLAG ON)
|
||||
find_package(Threads REQUIRED)
|
||||
|
||||
find_package(Boost REQUIRED)
|
||||
find_package(Boost REQUIRED COMPONENTS filesystem)
|
||||
find_package(ClickhouseCpp REQUIRED)
|
||||
pkg_check_modules(AUPARSE auparse REQUIRED)
|
||||
|
||||
@ -33,7 +33,7 @@ set(HEADERS
|
||||
)
|
||||
|
||||
add_executable( auditd-plugin-clickhouse ${SOURCES} ${HEADERS} )
|
||||
target_link_libraries( auditd-plugin-clickhouse ${CLICKHOUSECPP_LIBRARIES} ${AUPARSE_LIBRARIES} Threads::Threads )
|
||||
target_link_libraries( auditd-plugin-clickhouse ${CLICKHOUSECPP_LIBRARIES} ${AUPARSE_LIBRARIES} Threads::Threads Boost::filesystem )
|
||||
|
||||
install(TARGETS auditd-plugin-clickhouse RUNTIME DESTINATION ${BIN_INSTALL_LIBEXEC} )
|
||||
install(FILES auditd-plugin-clickhouse.conf DESTINATION ${SYSCONF_INSTALL_DIR}/audisp/plugins.d )
|
||||
|
@ -7,6 +7,9 @@ WriteTimeout=60
|
||||
# Write data to database when specified count of data records is buffered
|
||||
# Zero or negative value disables this feature
|
||||
WriteCountLimit=10000
|
||||
# Directory where a copy of data is kept until it's sent to database.
|
||||
# Feature is disabled if no value or an empty value is set.
|
||||
DataDirectory=/var/lib/auditd-plugin-clickhouse
|
||||
|
||||
[Connection]
|
||||
Hostname=localhost
|
||||
|
@ -43,7 +43,10 @@
|
||||
#include <boost/scope_exit.hpp>
|
||||
#include <boost/property_tree/ptree.hpp>
|
||||
#include <boost/property_tree/ini_parser.hpp>
|
||||
#include <boost/property_tree/json_parser.hpp>
|
||||
#include <boost/optional.hpp>
|
||||
#include <boost/filesystem.hpp>
|
||||
#include <boost/lexical_cast.hpp>
|
||||
|
||||
#include <regex>
|
||||
|
||||
@ -72,6 +75,8 @@ struct CallbackData
|
||||
int write_timeout;
|
||||
int write_count_limit;
|
||||
|
||||
std::string data_directory;
|
||||
|
||||
CallbackData()
|
||||
: clickhouse_client(nullptr),
|
||||
write_timeout(-1),
|
||||
@ -359,6 +364,15 @@ void auparse_callback(auparse_state_t *au, auparse_cb_event_t cb_event_type, voi
|
||||
}
|
||||
}
|
||||
|
||||
std::string data_filename;
|
||||
|
||||
if (not callback_data->data_directory.empty())
|
||||
{
|
||||
data_filename = callback_data->data_directory + boost::filesystem::path::separator + generate_name_for_audit_record(audit_record);
|
||||
|
||||
boost::property_tree::write_json(data_filename, audit_record.toPtree());
|
||||
}
|
||||
|
||||
if (callback_data->clickhouse_client && (!callback_data->table_name.empty()))
|
||||
{
|
||||
// send data directly
|
||||
@ -592,6 +606,7 @@ int main(int argc, char **argv)
|
||||
std::string table_name = "AuditData";
|
||||
size_t buffer_size = 4096;
|
||||
std::string datatypes_filename;
|
||||
std::string data_directory;
|
||||
|
||||
int write_timeout = -1;
|
||||
int write_count_limit = -1;
|
||||
@ -663,6 +678,27 @@ int main(int argc, char **argv)
|
||||
optional_set(datatypes_filename, *general, "DatatypesDescriptionFile");
|
||||
optional_set(write_timeout, *general, "WriteTimeout");
|
||||
optional_set(write_count_limit, *general, "WriteCountLimit");
|
||||
optional_set(data_directory, *general, "DataDirectory");
|
||||
|
||||
if (!data_directory.empty())
|
||||
{
|
||||
try
|
||||
{
|
||||
if (boost::filesystem::create_directory(boost::filesystem::path(data_directory)))
|
||||
{
|
||||
// if directory is created, set permissions
|
||||
boost::filesystem::permissions(boost::filesystem::path(data_directory), boost::filesystem::owner_all);
|
||||
}
|
||||
}
|
||||
catch (const std::exception &exc)
|
||||
{
|
||||
Logger::write("Caught exception while creating data directory: %s\n", exc.what());
|
||||
}
|
||||
catch (...)
|
||||
{
|
||||
Logger::write("Caught unknown exception while creating data directory\n");
|
||||
}
|
||||
}
|
||||
|
||||
write_timeout = (write_timeout > 0) ? (write_timeout * 1000) : -1;
|
||||
}
|
||||
@ -748,6 +784,8 @@ int main(int argc, char **argv)
|
||||
callback_data.write_timeout = write_timeout;
|
||||
callback_data.write_count_limit = write_count_limit;
|
||||
|
||||
callback_data.data_directory = data_directory;
|
||||
|
||||
{
|
||||
std::stringstream str;
|
||||
str << "CREATE TABLE IF NOT EXISTS " << table_name << " (record_time DateTime, record_milli UInt64, record_serial UInt64, record_node String";
|
||||
|
@ -25,6 +25,8 @@
|
||||
#include <functional>
|
||||
#include <set>
|
||||
|
||||
#include <boost/lexical_cast.hpp>
|
||||
|
||||
#include <clickhouse-cpp/columns/array.h>
|
||||
#include <clickhouse-cpp/columns/nullable.h>
|
||||
#include <clickhouse-cpp/columns/numeric.h>
|
||||
@ -204,6 +206,15 @@ std::string field_type_to_string(auparse_type_t field_type)
|
||||
}
|
||||
}
|
||||
|
||||
std::string generate_name_for_audit_record(const AuditRecord &record)
|
||||
{
|
||||
return boost::lexical_cast<std::string>(record.time)
|
||||
+ "_" + boost::lexical_cast<std::string>(record.milliseconds)
|
||||
+ "_" + boost::lexical_cast<std::string>(record.serial)
|
||||
+ "_" + record.node
|
||||
+ ".json";
|
||||
}
|
||||
|
||||
AbstractRecordField::AbstractRecordField(const std::string &name)
|
||||
: m_name(name)
|
||||
{
|
||||
@ -257,6 +268,16 @@ void CommonStringRecordField::addToColumn(const std::vector<Column> &columns) co
|
||||
columns[0].value->Append(std::make_shared<clickhouse::ColumnNullable>(data_column, null_column));
|
||||
}
|
||||
|
||||
const boost::optional<std::string>& CommonStringRecordField::getStringValue() const
|
||||
{
|
||||
return m_value;
|
||||
}
|
||||
|
||||
void CommonStringRecordField::setStringValue(const boost::optional<std::string> &value)
|
||||
{
|
||||
m_value = value;
|
||||
}
|
||||
|
||||
std::shared_ptr<AbstractRecordField> StringRecordField::createRecord(const std::string &name)
|
||||
{
|
||||
std::shared_ptr<AbstractRecordField> result;
|
||||
@ -396,6 +417,16 @@ AbstractRecordField::Type IntegerRecordField::getType() const
|
||||
return AbstractRecordField::Type::Int;
|
||||
}
|
||||
|
||||
const boost::optional<int>& IntegerRecordField::getIntValue() const
|
||||
{
|
||||
return m_int_value;
|
||||
}
|
||||
|
||||
void IntegerRecordField::setIntValue(const boost::optional<int> &value)
|
||||
{
|
||||
m_int_value = value;
|
||||
}
|
||||
|
||||
std::shared_ptr<AbstractRecordField> InterpretedStringArrayRecordField::createRecord(const std::string &name)
|
||||
{
|
||||
std::shared_ptr<AbstractRecordField> result;
|
||||
@ -464,3 +495,113 @@ AbstractRecordField::Type InterpretedStringArrayRecordField::getType() const
|
||||
{
|
||||
return AbstractRecordField::Type::InterpretedStringArray;
|
||||
}
|
||||
|
||||
const std::list<std::string>& InterpretedStringArrayRecordField::getNamesArray() const
|
||||
{
|
||||
return m_names_array;
|
||||
}
|
||||
|
||||
const std::list<std::string>& InterpretedStringArrayRecordField::getValuesArray() const
|
||||
{
|
||||
return m_values_array;
|
||||
}
|
||||
|
||||
void InterpretedStringArrayRecordField::setArrays(std::list<std::string> names_array, std::list<std::string> values_array)
|
||||
{
|
||||
if (names_array.size() != values_array.size())
|
||||
{
|
||||
throw std::runtime_error("InterpretedStringArrayRecordField::setArrays: array sizes mismatch");
|
||||
}
|
||||
|
||||
m_names_array.swap(names_array);
|
||||
m_values_array.swap(values_array);
|
||||
}
|
||||
|
||||
boost::property_tree::ptree AuditRecord::toPtree() const
|
||||
{
|
||||
boost::property_tree::ptree data;
|
||||
|
||||
data.put_child("time", boost::property_tree::ptree(boost::lexical_cast<std::string>(this->time)));
|
||||
data.put_child("milliseconds", boost::property_tree::ptree(boost::lexical_cast<std::string>(this->milliseconds)));
|
||||
data.put_child("serial", boost::property_tree::ptree(boost::lexical_cast<std::string>(this->serial)));
|
||||
data.put_child("node", boost::property_tree::ptree(this->node));
|
||||
|
||||
boost::property_tree::ptree fields_data;
|
||||
|
||||
for (auto iter = this->fields.begin(); iter != this->fields.end(); ++iter)
|
||||
{
|
||||
boost::property_tree::ptree item;
|
||||
|
||||
item.put_child("type", boost::property_tree::ptree(boost::lexical_cast<std::string>(static_cast<int>(iter->second->getType()))));
|
||||
|
||||
switch (iter->second->getType())
|
||||
{
|
||||
case AbstractRecordField::Type::Int:
|
||||
{
|
||||
auto ptr = std::dynamic_pointer_cast<IntegerRecordField>(iter->second);
|
||||
if (ptr)
|
||||
{
|
||||
auto int_value = ptr->getIntValue();
|
||||
if (int_value)
|
||||
{
|
||||
item.put_child("value_int", boost::property_tree::ptree(boost::lexical_cast<std::string>(*int_value)));
|
||||
}
|
||||
|
||||
auto str_value = ptr->getStringValue();
|
||||
if (str_value)
|
||||
{
|
||||
item.put_child("value_str", boost::property_tree::ptree(*str_value));
|
||||
}
|
||||
}
|
||||
}
|
||||
break;
|
||||
|
||||
case AbstractRecordField::Type::String:
|
||||
case AbstractRecordField::Type::InterpretedString:
|
||||
{
|
||||
auto ptr = std::dynamic_pointer_cast<CommonStringRecordField>(iter->second);
|
||||
if (ptr)
|
||||
{
|
||||
auto str_value = ptr->getStringValue();
|
||||
if (str_value)
|
||||
{
|
||||
item.put_child("value_str", boost::property_tree::ptree(*str_value));
|
||||
}
|
||||
}
|
||||
}
|
||||
break;
|
||||
|
||||
case AbstractRecordField::Type::InterpretedStringArray:
|
||||
{
|
||||
auto ptr = std::dynamic_pointer_cast<InterpretedStringArrayRecordField>(iter->second);
|
||||
if (ptr)
|
||||
{
|
||||
const auto &names_list = ptr->getNamesArray();
|
||||
const auto &values_list = ptr->getValuesArray();
|
||||
|
||||
boost::property_tree::ptree names_tree, values_tree;
|
||||
|
||||
for (auto array_iter = names_list.begin(); array_iter != names_list.end(); ++array_iter)
|
||||
{
|
||||
names_tree.push_back(std::make_pair(std::string(), boost::property_tree::ptree(*array_iter)));
|
||||
}
|
||||
|
||||
for (auto array_iter = values_list.begin(); array_iter != values_list.end(); ++array_iter)
|
||||
{
|
||||
values_tree.push_back(std::make_pair(std::string(), boost::property_tree::ptree(*array_iter)));
|
||||
}
|
||||
|
||||
item.put_child("names", names_tree);
|
||||
item.put_child("values", values_tree);
|
||||
}
|
||||
}
|
||||
break;
|
||||
}
|
||||
|
||||
fields_data.put_child(iter->first, item);
|
||||
}
|
||||
|
||||
data.put_child("fields", fields_data);
|
||||
|
||||
return data;
|
||||
}
|
||||
|
@ -30,14 +30,18 @@
|
||||
#include <stdint.h>
|
||||
|
||||
#include <boost/optional.hpp>
|
||||
#include <boost/property_tree/ptree.hpp>
|
||||
|
||||
#include <auparse.h>
|
||||
#include <libaudit.h>
|
||||
|
||||
#include <clickhouse-cpp/columns/column.h>
|
||||
|
||||
struct AuditRecord;
|
||||
|
||||
bool check_field_type(auparse_type_t field_type, const std::string &database_type, const std::string &database_field_name);
|
||||
std::string field_type_to_string(auparse_type_t field_type);
|
||||
std::string generate_name_for_audit_record(const AuditRecord &record);
|
||||
|
||||
class AbstractRecordField
|
||||
{
|
||||
@ -87,6 +91,8 @@ struct AuditRecord
|
||||
std::string node; // skip processing node from record fields
|
||||
|
||||
std::map<std::string, std::shared_ptr<AbstractRecordField> > fields;
|
||||
|
||||
boost::property_tree::ptree toPtree() const;
|
||||
};
|
||||
|
||||
class CommonStringRecordField: public AbstractRecordField
|
||||
@ -95,6 +101,9 @@ public:
|
||||
virtual std::vector<Column> generateColumnsAndNames() const override;
|
||||
virtual void addToColumn(const std::vector<Column> &columns) const override;
|
||||
|
||||
const boost::optional<std::string>& getStringValue() const;
|
||||
void setStringValue(const boost::optional<std::string> &value);
|
||||
|
||||
protected:
|
||||
explicit CommonStringRecordField(const std::string &name);
|
||||
|
||||
@ -135,6 +144,9 @@ public:
|
||||
virtual void addToColumn(const std::vector<Column> &columns) const override;
|
||||
virtual Type getType() const override;
|
||||
|
||||
const boost::optional<int>& getIntValue() const;
|
||||
void setIntValue(const boost::optional<int> &value);
|
||||
|
||||
protected:
|
||||
explicit IntegerRecordField(const std::string &name);
|
||||
|
||||
@ -151,6 +163,10 @@ public:
|
||||
virtual void addToColumn(const std::vector<Column> &columns) const override;
|
||||
virtual Type getType() const override;
|
||||
|
||||
const std::list<std::string>& getNamesArray() const;
|
||||
const std::list<std::string>& getValuesArray() const;
|
||||
void setArrays(std::list<std::string> names_array, std::list<std::string> values_array);
|
||||
|
||||
protected:
|
||||
explicit InterpretedStringArrayRecordField(const std::string &name);
|
||||
|
||||
|
Loading…
Reference in New Issue
Block a user