2019-11-11 16:55:42 +03:00
/*
* auditd - plugin - clickhouse is an auditd plugin for sending auditd data
* to clickhouse DB .
* Copyright ( C ) 2019 Aleksei Nikiforov < darktemplar @ basealt . ru >
*
* This program is free software : you can redistribute it and / or modify
* it under the terms of the GNU General Public License as published by
* the Free Software Foundation , either version 3 of the License , or
* ( at your option ) any later version .
*
* This program is distributed in the hope that it will be useful ,
* but WITHOUT ANY WARRANTY ; without even the implied warranty of
* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE . See the
* GNU General Public License for more details .
*
* You should have received a copy of the GNU General Public License
* along with this program . If not , see < https : //www.gnu.org/licenses/>.
*
*/
2019-11-12 12:54:16 +03:00
# include <signal.h>
# include <poll.h>
# include <unistd.h>
# include <errno.h>
2019-12-09 11:44:51 +03:00
# include <time.h>
2019-11-12 12:54:16 +03:00
# include <string>
# include <map>
# include <fstream>
# include <vector>
2019-12-12 15:01:19 +03:00
# include <set>
2019-11-12 12:54:16 +03:00
# include <auparse.h>
2019-12-09 11:44:51 +03:00
# include <libaudit.h>
2019-11-12 12:54:16 +03:00
2019-11-11 16:55:42 +03:00
# include <clickhouse-cpp/client.h>
2019-11-12 12:54:16 +03:00
# include <boost/scope_exit.hpp>
# include <boost/property_tree/ptree.hpp>
# include <boost/property_tree/ini_parser.hpp>
2019-12-09 11:44:51 +03:00
# include <boost/optional.hpp>
2019-11-12 12:54:16 +03:00
2019-12-12 16:12:44 +03:00
# include <regex>
2019-12-11 16:06:13 +03:00
# include "auditd-datatypes.hpp"
2019-11-12 12:54:16 +03:00
int runpipes [ 2 ] = { - 1 , - 1 } ;
volatile bool running = true ;
2019-12-12 15:01:19 +03:00
struct CallbackData
{
std : : map < std : : string , std : : string > datatypes_map ;
std : : list < std : : tuple < std : : string , std : : string , std : : string > > datatype_regexps_map ;
std : : map < std : : string , std : : function < std : : shared_ptr < AbstractRecordField > ( const std : : string & name ) > > type_creation_map ;
std : : set < std : : string > all_fields_set ;
clickhouse : : Client * clickhouse_client ;
2019-12-16 16:09:45 +03:00
std : : string table_name ;
2019-12-12 15:01:19 +03:00
CallbackData ( )
: clickhouse_client ( nullptr )
{
}
} ;
2019-11-12 12:54:16 +03:00
static void term_handler ( int sig )
{
if ( runpipes [ 1 ] ! = - 1 )
{
write ( runpipes [ 1 ] , " 1 " , 1 ) ;
}
running = false ;
}
bool is_valid_table_name ( const std : : string & value )
{
return ( std : : find_if_not ( value . begin ( ) , value . end ( ) , [ ] ( const char c ) { return ( std : : isalnum ( c ) | | ( strchr ( " _ " , c ) ! = NULL ) ) ; } ) = = value . end ( ) ) ;
}
template < typename T >
bool always_true ( const T & )
{
return true ;
}
template < typename T >
void optional_set ( T & value , const boost : : property_tree : : ptree & tree , const char * element_name , bool ( * check_function ) ( const T & ) )
{
auto element = tree . get_child_optional ( element_name ) ;
if ( element )
{
if ( check_function ( element - > get_value < T > ( ) ) )
{
value = element - > get_value < T > ( ) ;
}
else
{
throw std : : runtime_error ( std : : string ( " Invalid value for option ' " ) + std : : string ( element_name ) + std : : string ( " ' " ) ) ;
}
}
}
template < typename T >
void optional_set ( T & value , const boost : : property_tree : : ptree & tree , const char * element_name )
{
optional_set ( value , tree , element_name , & always_true < T > ) ;
}
2019-12-16 18:21:13 +03:00
std : : string sanitize_column_name ( const std : : string & name )
{
auto result = name ;
std : : replace ( result . begin ( ) , result . end ( ) , ' - ' , ' _ ' ) ;
return result ;
}
2019-11-12 12:54:16 +03:00
void auparse_callback ( auparse_state_t * au , auparse_cb_event_t cb_event_type , void * user_data )
{
2019-12-12 15:01:19 +03:00
CallbackData * callback_data = reinterpret_cast < CallbackData * > ( user_data ) ;
if ( ( ! callback_data ) | | ( auparse_first_record ( au ) < = 0 ) )
2019-12-09 11:44:51 +03:00
{
return ;
}
size_t record = 1 ;
for ( ; ; )
{
2019-12-16 15:43:29 +03:00
try
2019-12-09 11:44:51 +03:00
{
2019-12-16 15:43:29 +03:00
if ( cb_event_type = = AUPARSE_CB_EVENT_READY )
{
AuditRecord audit_record ;
2019-12-12 15:01:19 +03:00
2019-12-16 15:43:29 +03:00
audit_record . time = auparse_get_time ( au ) ;
audit_record . milliseconds = auparse_get_milli ( au ) ;
audit_record . serial = auparse_get_serial ( au ) ;
audit_record . node = auparse_get_node ( au ) ;
2019-12-09 11:44:51 +03:00
2019-12-16 15:43:29 +03:00
if ( auparse_first_field ( au ) > 0 )
2019-12-09 11:44:51 +03:00
{
2019-12-16 15:43:29 +03:00
do
2019-12-12 15:01:19 +03:00
{
2019-12-16 15:43:29 +03:00
std : : string field_name = auparse_get_field_name ( au ) ;
2019-12-12 16:12:44 +03:00
2019-12-16 15:43:29 +03:00
if ( field_name ! = " node " ) // skip node since it's already processed
2019-12-12 16:12:44 +03:00
{
2019-12-16 15:43:29 +03:00
const std : : string field_name = auparse_get_field_name ( au ) ;
auparse_type_t field_type = static_cast < auparse_type_t > ( auparse_get_field_type ( au ) ) ;
std : : string database_type ;
std : : string database_name ;
// first search for this field name in datatypes map,
// if it's not found there search all elements in datatypes regexp container
2019-12-12 16:12:44 +03:00
{
2019-12-16 15:43:29 +03:00
auto iter = callback_data - > datatypes_map . find ( field_name ) ;
if ( iter ! = callback_data - > datatypes_map . end ( ) )
2019-12-12 16:12:44 +03:00
{
2019-12-16 15:43:29 +03:00
database_type = iter - > second ;
database_name = iter - > first ;
}
else
{
for ( auto regexp_iter = callback_data - > datatype_regexps_map . begin ( ) ; regexp_iter ! = callback_data - > datatype_regexps_map . end ( ) ; + + regexp_iter )
2019-12-12 16:12:44 +03:00
{
2019-12-16 15:43:29 +03:00
std : : regex audit_name_regex ( std : : get < 0 > ( * regexp_iter ) ) ;
if ( std : : regex_match ( field_name , audit_name_regex ) )
{
database_type = std : : get < 1 > ( * regexp_iter ) ;
database_name = std : : get < 2 > ( * regexp_iter ) ;
break ;
}
2019-12-12 16:12:44 +03:00
}
}
}
2019-12-16 15:43:29 +03:00
if ( database_type . empty ( ) | | database_name . empty ( ) )
{
fprintf ( stderr , " Couldn't find matching database entry for field with name \" %s \" and type %d \n " , field_name . c_str ( ) , ( int ) field_type ) ;
continue ;
}
2019-12-12 16:12:44 +03:00
2019-12-16 15:43:29 +03:00
if ( ! check_field_type ( field_type , database_type , field_name ) )
2019-12-12 16:12:44 +03:00
{
2019-12-16 15:43:29 +03:00
fprintf ( stderr , " Warning: expected datatype doesn't match database datatype for field \" %s \" : expected \" %s \" , actual %d \n " ,
field_name . c_str ( ) , database_type . c_str ( ) , field_type ) ;
2019-12-12 16:12:44 +03:00
}
2019-12-16 15:43:29 +03:00
std : : shared_ptr < AbstractRecordField > data_ptr ;
// If field is present in audit record, reuse it
// and update it's value,
// otherwise create new one and register it
2019-12-12 16:12:44 +03:00
{
2019-12-16 15:43:29 +03:00
auto data_iter = audit_record . fields . find ( database_name ) ;
if ( data_iter ! = audit_record . fields . end ( ) )
2019-12-12 16:12:44 +03:00
{
2019-12-16 15:43:29 +03:00
data_ptr = data_iter - > second ;
2019-12-12 16:12:44 +03:00
}
else
{
2019-12-16 15:43:29 +03:00
auto iter = callback_data - > type_creation_map . find ( database_type ) ;
if ( iter ! = callback_data - > type_creation_map . end ( ) )
{
data_ptr = iter - > second ( database_name ) ;
}
else
{
fprintf ( stderr , " Warning: no creator function found for data type \" %s \" , using \" string \" as fallback \n " , database_type . c_str ( ) ) ;
data_ptr = InterpretedStringRecordField : : createRecord ( database_name ) ;
}
2019-12-12 16:12:44 +03:00
2019-12-16 15:43:29 +03:00
audit_record . fields [ database_name ] = data_ptr ;
}
2019-12-12 16:12:44 +03:00
}
2019-12-12 15:01:19 +03:00
2019-12-16 15:43:29 +03:00
data_ptr - > addOrUpdateValue ( au ) ;
}
} while ( auparse_next_field ( au ) > 0 ) ;
2019-12-16 15:35:40 +03:00
}
2019-12-16 15:43:29 +03:00
// first add all missing fields, keep data empty
2019-12-16 15:35:40 +03:00
{
2019-12-16 15:43:29 +03:00
auto missing_fields = callback_data - > all_fields_set ;
2019-12-16 15:35:40 +03:00
2019-12-16 15:43:29 +03:00
for ( auto iter = audit_record . fields . begin ( ) ; iter ! = audit_record . fields . end ( ) ; + + iter )
2019-12-16 15:35:40 +03:00
{
2019-12-16 15:43:29 +03:00
missing_fields . erase ( iter - > first ) ;
2019-12-16 15:35:40 +03:00
}
2019-12-16 15:43:29 +03:00
for ( auto iter = missing_fields . begin ( ) ; iter ! = missing_fields . end ( ) ; + + iter )
2019-12-16 15:35:40 +03:00
{
2019-12-16 15:43:29 +03:00
std : : string type_name ;
auto type_iter = callback_data - > datatypes_map . find ( * iter ) ;
if ( type_iter ! = callback_data - > datatypes_map . end ( ) )
{
type_name = type_iter - > second ;
}
else
2019-12-16 15:35:40 +03:00
{
2019-12-16 15:43:29 +03:00
for ( auto regex_type_iter = callback_data - > datatype_regexps_map . begin ( ) ; regex_type_iter ! = callback_data - > datatype_regexps_map . end ( ) ; + + regex_type_iter )
2019-12-16 15:35:40 +03:00
{
2019-12-16 15:43:29 +03:00
if ( * iter = = std : : get < 2 > ( * regex_type_iter ) )
{
type_name = std : : get < 1 > ( * regex_type_iter ) ;
break ;
}
2019-12-16 15:35:40 +03:00
}
}
2019-12-16 15:43:29 +03:00
if ( ! type_name . empty ( ) )
2019-12-16 15:35:40 +03:00
{
2019-12-16 15:43:29 +03:00
auto factory_iter = callback_data - > type_creation_map . find ( type_name ) ;
if ( factory_iter ! = callback_data - > type_creation_map . end ( ) )
{
audit_record . fields [ * iter ] = factory_iter - > second ( * iter ) ;
}
}
else
{
fprintf ( stderr , " Couldn't find registered type name for record with name \" %s \" \n " , iter - > c_str ( ) ) ;
continue ;
2019-12-16 15:35:40 +03:00
}
}
}
2019-12-16 16:09:45 +03:00
clickhouse : : Block block ;
auto record_time_record = std : : make_shared < clickhouse : : ColumnDateTime > ( ) ;
record_time_record - > Append ( audit_record . time ) ;
block . AppendColumn ( " record_time " , record_time_record ) ;
auto record_milliseconds_record = std : : make_shared < clickhouse : : ColumnUInt64 > ( ) ;
record_milliseconds_record - > Append ( audit_record . milliseconds ) ;
block . AppendColumn ( " record_milli " , record_milliseconds_record ) ;
auto record_serial_record = std : : make_shared < clickhouse : : ColumnUInt64 > ( ) ;
record_serial_record - > Append ( audit_record . serial ) ;
block . AppendColumn ( " record_serial " , record_serial_record ) ;
auto record_node_record = std : : make_shared < clickhouse : : ColumnString > ( ) ;
record_node_record - > Append ( audit_record . node ) ;
block . AppendColumn ( " record_node " , record_node_record ) ;
for ( auto iter = audit_record . fields . begin ( ) ; iter ! = audit_record . fields . end ( ) ; + + iter )
{
auto columns = iter - > second - > generateColumnsAndNames ( ) ;
iter - > second - > addToColumn ( columns ) ;
for ( auto column_iter = columns . begin ( ) ; column_iter ! = columns . end ( ) ; + + column_iter )
{
2019-12-16 18:21:13 +03:00
block . AppendColumn ( sanitize_column_name ( column_iter - > name ) , column_iter - > value ) ;
2019-12-16 16:09:45 +03:00
}
}
callback_data - > clickhouse_client - > Insert ( callback_data - > table_name , block ) ;
2019-12-16 15:43:29 +03:00
}
}
catch ( const std : : exception & e )
{
fprintf ( stderr , " Caught exception while processing audit record %zu/%zu: %s \n " , record , ( size_t ) auparse_get_num_records ( au ) , e . what ( ) ) ;
}
catch ( . . . )
{
fprintf ( stderr , " Caught unknown exception while processing audit record %zu/%zu \n " , record , ( size_t ) auparse_get_num_records ( au ) ) ;
2019-12-09 11:44:51 +03:00
}
if ( auparse_get_num_records ( au ) > record )
{
+ + record ;
auparse_next_record ( au ) ;
}
else
{
break ;
}
}
2019-12-11 16:06:13 +03:00
}
2019-12-09 11:44:51 +03:00
2019-12-11 16:06:13 +03:00
std : : string construct_clickhouse_datatype_string ( const std : : string & name , const std : : string & audit_type )
{
2019-12-17 14:24:32 +03:00
static const std : : map < std : : string , std : : vector < std : : string > > audit_table_map = {
{ " string " , { " Nullable(String) " } } ,
{ " integer " , { " _IntValue Nullable(Int64) " , " _InterpretedValue LowCardinality(Nullable(String)) " } } ,
{ " string_array " , { " _Name Array(String) " , " _Value Array(String) " } }
2019-12-11 16:06:13 +03:00
} ;
2019-12-17 14:24:32 +03:00
std : : vector < std : : string > clickhouse_types ;
2019-12-11 16:06:13 +03:00
auto iter = audit_table_map . find ( audit_type ) ;
if ( iter ! = audit_table_map . end ( ) )
{
2019-12-17 14:24:32 +03:00
clickhouse_types = iter - > second ;
2019-12-11 16:06:13 +03:00
}
else
2019-12-09 11:44:51 +03:00
{
2019-12-11 16:06:13 +03:00
// Fallback to string
2019-12-11 16:35:42 +03:00
fprintf ( stderr , " Warning: unknown database type for record name \" %s \" \n " , name . c_str ( ) ) ;
2019-12-17 14:24:32 +03:00
clickhouse_types . push_back ( " Nullable(String) " ) ;
2019-12-09 11:44:51 +03:00
}
2019-12-11 16:06:13 +03:00
2019-12-17 14:24:32 +03:00
std : : stringstream result ;
result < < sanitize_column_name ( name ) + clickhouse_types [ 0 ] ;
for ( size_t i = 1 ; i < clickhouse_types . size ( ) ; + + i )
{
result < < " , " < < sanitize_column_name ( name ) + clickhouse_types [ i ] ;
}
return result . str ( ) ;
2019-11-12 12:54:16 +03:00
}
2019-11-11 16:55:42 +03:00
int main ( int argc , char * * argv )
{
2019-11-12 12:54:16 +03:00
if ( argc ! = 2 )
{
fprintf ( stderr , " Error: USAGE: %s config \n " , argv [ 0 ] ) ;
return - 1 ;
}
try
{
clickhouse : : ClientOptions client_options ;
2019-12-11 16:06:13 +03:00
std : : string table_name = " AuditData " ;
2019-11-12 12:54:16 +03:00
size_t buffer_size = 4096 ;
2019-12-11 16:06:13 +03:00
std : : string datatypes_filename ;
2019-11-12 12:54:16 +03:00
if ( pipe ( runpipes ) < 0 )
{
throw std : : runtime_error ( " Failed to create pipes " ) ;
}
BOOST_SCOPE_EXIT ( & runpipes )
{
close ( runpipes [ 0 ] ) ;
close ( runpipes [ 1 ] ) ;
runpipes [ 0 ] = - 1 ;
runpipes [ 1 ] = - 1 ;
} BOOST_SCOPE_EXIT_END
{
struct sigaction sa ;
sa . sa_flags = 0 ;
sigemptyset ( & sa . sa_mask ) ;
sa . sa_handler = term_handler ;
if ( sigaction ( SIGTERM , & sa , NULL ) < 0 )
{
throw std : : runtime_error ( " Failed to set sigterm handler " ) ;
}
}
/* First read config */
{
boost : : property_tree : : ptree clickhouse_config_tree ;
std : : ifstream stream ( argv [ 1 ] ) ;
boost : : property_tree : : read_ini ( stream , clickhouse_config_tree ) ;
auto general = clickhouse_config_tree . get_child_optional ( " General " ) ;
if ( general )
{
optional_set ( buffer_size , * general , " BufferSize " ) ;
2019-12-11 16:06:13 +03:00
optional_set ( datatypes_filename , * general , " DatatypesDescriptionFile " ) ;
2019-11-12 12:54:16 +03:00
}
auto client_connection = clickhouse_config_tree . get_child_optional ( " Connection " ) ;
if ( client_connection )
{
2019-12-11 16:06:13 +03:00
optional_set ( table_name , * client_connection , " TableName " , & is_valid_table_name ) ;
2019-11-12 12:54:16 +03:00
optional_set ( client_options . host , * client_connection , " Hostname " ) ;
optional_set ( client_options . port , * client_connection , " Port " ) ;
optional_set ( client_options . default_database , * client_connection , " DefaultDatabase " ) ;
optional_set ( client_options . user , * client_connection , " Username " ) ;
optional_set ( client_options . password , * client_connection , " Password " ) ;
optional_set ( client_options . ping_before_query , * client_connection , " PingBeforeQuery " ) ;
optional_set ( client_options . send_retries , * client_connection , " SendRetries " ) ;
{
auto element = client_connection - > get_child_optional ( " RetryTimeout " ) ;
if ( element )
{
client_options . retry_timeout = std : : chrono : : seconds ( element - > get_value < int > ( ) ) ;
}
}
{
auto element = client_connection - > get_child_optional ( " CompressionMethod " ) ;
if ( element )
{
const auto value = element - > get_value < std : : string > ( ) ;
const std : : map < std : : string , clickhouse : : CompressionMethod > compression_method_map =
{
{ " None " , clickhouse : : CompressionMethod : : None } ,
{ " LZ4 " , clickhouse : : CompressionMethod : : LZ4 } ,
} ;
auto iter = compression_method_map . find ( value ) ;
if ( iter ! = compression_method_map . end ( ) )
{
client_options . compression_method = iter - > second ;
}
else
{
client_options . compression_method = clickhouse : : CompressionMethod : : None ;
}
}
}
}
}
2019-12-11 16:06:13 +03:00
read_datatypes_map ( datatypes_filename ) ;
2019-11-12 12:54:16 +03:00
/* Now connect to clickhouse */
clickhouse : : Client client ( client_options ) ;
2019-12-11 16:06:13 +03:00
2019-12-12 15:01:19 +03:00
CallbackData callback_data ;
callback_data . datatypes_map = get_datatypes_map ( ) ;
callback_data . datatype_regexps_map = get_datatype_regexps_map ( ) ;
callback_data . type_creation_map = get_type_creation_map ( ) ;
callback_data . clickhouse_client = & client ;
2019-12-16 16:09:45 +03:00
callback_data . table_name = table_name ;
2019-12-12 15:01:19 +03:00
2019-12-11 16:06:13 +03:00
{
std : : stringstream str ;
str < < " CREATE TABLE IF NOT EXISTS " < < table_name < < " (record_time DateTime, record_milli UInt64, record_serial UInt64, record_node String " ;
2019-12-12 15:01:19 +03:00
for ( auto iter = callback_data . datatypes_map . begin ( ) ; iter ! = callback_data . datatypes_map . end ( ) ; + + iter )
2019-12-11 16:06:13 +03:00
{
str < < " , " < < construct_clickhouse_datatype_string ( iter - > first , iter - > second ) ;
2019-12-12 15:01:19 +03:00
callback_data . all_fields_set . insert ( iter - > first ) ;
2019-12-11 16:06:13 +03:00
}
2019-12-12 15:01:19 +03:00
for ( auto iter = callback_data . datatype_regexps_map . begin ( ) ; iter ! = callback_data . datatype_regexps_map . end ( ) ; + + iter )
2019-12-11 16:06:13 +03:00
{
str < < " , " < < construct_clickhouse_datatype_string ( std : : get < 2 > ( * iter ) , std : : get < 1 > ( * iter ) ) ;
2019-12-12 15:01:19 +03:00
callback_data . all_fields_set . insert ( std : : get < 2 > ( * iter ) ) ;
2019-12-11 16:06:13 +03:00
}
str < < " ) ENGINE = MergeTree ORDER BY (record_time, record_milli, record_serial, record_node) " ;
client . Execute ( str . str ( ) ) ;
}
2019-11-12 12:54:16 +03:00
std : : unique_ptr < auparse_state_t , void ( * ) ( auparse_state_t * ) > au ( auparse_init ( AUSOURCE_FEED , 0 ) , [ ] ( auparse_state_t * obj ) {
if ( obj )
{
auparse_flush_feed ( obj ) ;
auparse_destroy ( obj ) ;
}
} ) ;
if ( ! au )
{
throw std : : runtime_error ( " Failed to initialize audit " ) ;
}
2019-12-12 15:01:19 +03:00
auparse_add_callback ( au . get ( ) , auparse_callback , & callback_data , NULL ) ;
2019-11-12 12:54:16 +03:00
std : : vector < char > data ;
data . resize ( buffer_size ) ;
struct pollfd pollfds [ 2 ] ;
pollfds [ 0 ] . fd = STDIN_FILENO ;
pollfds [ 1 ] . fd = runpipes [ 0 ] ;
while ( running )
{
pollfds [ 0 ] . events = POLLIN | POLLHUP ;
pollfds [ 0 ] . revents = 0 ;
pollfds [ 1 ] . events = POLLIN | POLLHUP ;
pollfds [ 1 ] . revents = 0 ;
int res = poll ( pollfds , sizeof ( pollfds ) / sizeof ( pollfds [ 0 ] ) , - 1 ) ;
if ( res < 0 )
{
// error occured, finish running
fprintf ( stderr , " Poll returned error: %d \n " , errno ) ;
running = false ;
}
else if ( res = = 0 )
{
// timeout, do nothing
}
else if ( pollfds [ 0 ] . revents & POLLIN )
{
ssize_t readsize = read ( STDIN_FILENO , data . data ( ) , data . size ( ) ) ;
if ( readsize > 0 )
{
auparse_feed ( au . get ( ) , data . data ( ) , readsize ) ;
}
else
{
// error occured, finish running
fprintf ( stderr , " Read returned error: %d \n " , errno ) ;
running = false ;
}
}
2019-12-17 14:24:28 +03:00
else if ( ( pollfds [ 0 ] . revents & POLLHUP ) | | ( pollfds [ 0 ] . revents & POLLERR ) )
2019-11-12 12:54:16 +03:00
{
// stdin closed, no more data, finish running
running = false ;
}
}
}
catch ( const std : : exception & e )
{
fprintf ( stderr , " Caught exception: %s \n " , e . what ( ) ) ;
return - 1 ;
}
catch ( . . . )
{
fprintf ( stderr , " Caught unknown exception \n " ) ;
return - 1 ;
}
2019-11-11 16:55:42 +03:00
return 0 ;
}