2017-04-19 21:44:31 +03:00
/* -------------------------------------------------------------------------- */
2018-01-02 20:27:37 +03:00
/* Copyright 2002-2018, OpenNebula Project, OpenNebula Systems */
2017-04-19 21:44:31 +03:00
/* */
/* Licensed under the Apache License, Version 2.0 (the "License"); you may */
/* not use this file except in compliance with the License. You may obtain */
/* a copy of the License at */
/* */
/* http://www.apache.org/licenses/LICENSE-2.0 */
/* */
/* Unless required by applicable law or agreed to in writing, software */
/* distributed under the License is distributed on an "AS IS" BASIS, */
/* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. */
/* See the License for the specific language governing permissions and */
/* limitations under the License. */
/* -------------------------------------------------------------------------- */
# ifndef LOG_DB_H_
# define LOG_DB_H_
# include <string>
# include <sstream>
2017-06-29 20:47:56 +03:00
# include <set>
2017-04-19 21:44:31 +03:00
# include "SqlDB.h"
2017-04-27 02:03:44 +03:00
/**
* This class represents a log record
*/
2017-05-04 23:56:07 +03:00
class LogDBRecord : public Callbackable
2017-04-27 02:03:44 +03:00
{
2017-05-04 23:56:07 +03:00
public :
2017-04-27 02:03:44 +03:00
/**
* Index for this log entry ( and previous )
*/
unsigned int index ;
unsigned int prev_index ;
/**
* Term where this log ( and previous ) entry was generated
*/
unsigned int term ;
unsigned int prev_term ;
/**
* SQL command to exec in the DB to update ( INSERT , REPLACE , DROP )
*/
std : : string sql ;
/**
* Time when the record has been applied to DB . 0 if not applied
*/
time_t timestamp ;
2017-05-04 23:56:07 +03:00
2017-06-29 20:47:56 +03:00
/**
* The index in the federation , - 1 if the log entry is not federated .
* At master fed_index is equal to index .
*/
int fed_index ;
2017-05-04 23:56:07 +03:00
/**
* Sets callback to load register from DB
*/
void set_callback ( )
{
Callbackable : : set_callback (
static_cast < Callbackable : : Callback > ( & LogDBRecord : : select_cb ) ) ;
}
private :
/**
* SQL callback to load logDBRecord from DB ( SELECT commands )
*/
2017-06-01 01:52:20 +03:00
int select_cb ( void * nil , int num , char * * values , char * * names ) ;
2017-04-27 02:03:44 +03:00
} ;
/**
* This class implements a generic DB interface with replication . The associated
* DB stores a log to replicate on followers .
*/
2017-06-29 20:47:56 +03:00
class LogDB : public SqlDB , Callbackable
2017-04-19 21:44:31 +03:00
{
public :
2017-05-30 17:16:03 +03:00
LogDB ( SqlDB * _db , bool solo , unsigned int log_retention ) ;
2017-04-21 17:52:54 +03:00
2017-04-25 12:49:52 +03:00
virtual ~ LogDB ( ) ;
2017-04-19 21:44:31 +03:00
2017-04-27 02:03:44 +03:00
// -------------------------------------------------------------------------
// Interface to access Log records
// -------------------------------------------------------------------------
2017-04-20 17:13:41 +03:00
/**
2017-04-27 02:03:44 +03:00
* Loads a log record from the database . Memory is allocated by this class
* and needs to be freed .
2017-04-20 17:13:41 +03:00
* @ param index of the associated logDB entry
2017-05-04 23:56:07 +03:00
* @ param lr logDBrecored to load from the DB
* @ return 0 on success - 1 otherwise
2017-04-27 02:03:44 +03:00
*/
2017-05-04 23:56:07 +03:00
int get_log_record ( unsigned int index , LogDBRecord & lr ) ;
2017-04-27 02:03:44 +03:00
/**
* Applies the SQL command of the given record to the database . The
* timestamp of the record is updated .
* @ param index of the log record
*/
2017-04-27 13:42:09 +03:00
int apply_log_records ( unsigned int commit_index ) ;
/**
* Deletes the record in start_index and all that follow it
* @ param start_index first log record to delete
*/
int delete_log_records ( unsigned int start_index ) ;
2017-04-27 02:03:44 +03:00
/**
2017-05-03 00:43:18 +03:00
* Inserts a new log record in the database . This method should be used
2017-05-07 00:36:08 +03:00
* in FOLLOWER mode to replicate leader log .
2017-04-28 20:35:57 +03:00
* @ param index for the record
2017-04-27 02:03:44 +03:00
* @ param term for the record
* @ param sql command of the record
* @ param timestamp associated to this record
2017-06-29 20:47:56 +03:00
* @ param fed_index index in the federation - 1 if not federated
2017-04-20 17:13:41 +03:00
*
2017-04-27 02:03:44 +03:00
* @ return - 1 on failure , index of the inserted record on success
2017-04-20 17:13:41 +03:00
*/
2017-04-28 20:35:57 +03:00
int insert_log_record ( unsigned int index , unsigned int term ,
2017-06-29 20:47:56 +03:00
std : : ostringstream & sql , time_t timestamp , int fed_index ) ;
2017-05-03 00:43:18 +03:00
//--------------------------------------------------------------------------
// Functions to manage the Raft state. Log record 0, term -1
// -------------------------------------------------------------------------
/**
* Stores the raft state in the log
* @ param raft attributes in XML format
* @ return 0 on success
*/
2017-06-01 12:13:12 +03:00
int update_raft_state ( std : : string & raft_xml ) ;
2017-04-19 21:44:31 +03:00
2017-05-03 00:43:18 +03:00
/**
* Returns the raft state attributes as stored in the log
* @ param raft_xml attributes in xml
* @ return 0 on success
*/
int get_raft_state ( std : : string & raft_xml ) ;
2017-04-30 00:25:53 +03:00
/**
* Purge log records . Delete old records applied to database upto the
* LOG_RETENTION configuration variable .
* @ return 0 on success
*/
int purge_log ( ) ;
2017-04-20 17:13:41 +03:00
// -------------------------------------------------------------------------
// SQL interface
// -------------------------------------------------------------------------
/**
* This function replicates the DB changes on followers before updating
* the DB state
*/
2017-06-29 20:47:56 +03:00
int exec_wr ( ostringstream & cmd )
{
return _exec_wr ( cmd , - 1 ) ;
}
int exec_federated_wr ( ostringstream & cmd )
{
return _exec_wr ( cmd , 0 ) ;
}
int exec_federated_wr ( ostringstream & cmd , int index )
{
return _exec_wr ( cmd , index ) ;
}
2017-04-19 21:44:31 +03:00
2017-04-21 20:16:45 +03:00
int exec_local_wr ( ostringstream & cmd )
2017-04-20 17:13:41 +03:00
{
2017-04-21 20:16:45 +03:00
return db - > exec_local_wr ( cmd ) ;
2017-04-20 17:13:41 +03:00
}
2017-04-19 21:44:31 +03:00
2017-04-21 17:52:54 +03:00
int exec_rd ( ostringstream & cmd , Callbackable * obj )
2017-04-20 17:13:41 +03:00
{
return db - > exec_rd ( cmd , obj ) ;
2017-04-19 21:44:31 +03:00
}
char * escape_str ( const string & str )
{
return db - > escape_str ( str ) ;
}
void free_str ( char * str )
{
db - > free_str ( str ) ;
}
bool multiple_values_support ( )
{
return db - > multiple_values_support ( ) ;
}
2017-04-21 17:52:54 +03:00
// -------------------------------------------------------------------------
// Database methods
// -------------------------------------------------------------------------
2017-08-09 11:45:01 +03:00
static int bootstrap ( SqlDB * _db ) ;
2017-04-21 17:52:54 +03:00
2017-04-27 02:03:44 +03:00
/**
2017-04-28 20:35:57 +03:00
* This function gets and initialize log related index
2017-04-27 02:03:44 +03:00
* @ param last_applied , highest index applied to the DB
* @ param last_index
*
* @ return 0 on success
*/
int setup_index ( int & last_applied , int & last_index ) ;
2017-04-28 20:35:57 +03:00
/**
2017-05-04 23:56:07 +03:00
* Gets the index & term of the last record in the log
2017-05-03 00:43:18 +03:00
* @ param _i the index
* @ param _t the term
2017-04-28 20:35:57 +03:00
*/
2017-05-07 00:36:08 +03:00
void get_last_record_index ( unsigned int & _i , unsigned int & _t ) ;
2017-04-28 20:35:57 +03:00
2017-06-29 20:47:56 +03:00
// -------------------------------------------------------------------------
// Federate log methods
// -------------------------------------------------------------------------
/**
* Get last federated index , and previous
*/
int last_federated ( ) ;
int previous_federated ( int index ) ;
int next_federated ( int index ) ;
2017-04-19 21:44:31 +03:00
protected :
2017-04-27 02:03:44 +03:00
int exec ( std : : ostringstream & cmd , Callbackable * obj , bool quiet )
2017-04-19 21:44:31 +03:00
{
2017-04-20 17:13:41 +03:00
return - 1 ;
2017-04-19 21:44:31 +03:00
}
private :
2017-04-21 17:52:54 +03:00
pthread_mutex_t mutex ;
2017-04-28 20:35:57 +03:00
/**
* The Database was started in solo mode ( no server_id defined )
*/
bool solo ;
2017-04-19 21:44:31 +03:00
/**
* Pointer to the underlying DB store
*/
SqlDB * db ;
/**
* Index to be used by the next logDB record
*/
unsigned int next_index ;
2017-04-20 17:13:41 +03:00
/**
2017-04-27 02:03:44 +03:00
* Index of the last log entry applied to the DB state
2017-04-20 17:13:41 +03:00
*/
2017-04-27 02:03:44 +03:00
unsigned int last_applied ;
2017-04-20 17:13:41 +03:00
2017-05-07 00:36:08 +03:00
/**
* Index of the last ( highest ) log entry
*/
unsigned int last_index ;
/**
* term of the last ( highest ) log entry
*/
unsigned int last_term ;
2017-04-30 00:25:53 +03:00
/**
* Max number of records to keep in the database
*/
2017-05-30 17:16:03 +03:00
unsigned int log_retention ;
2017-04-30 00:25:53 +03:00
2017-06-29 20:47:56 +03:00
// -------------------------------------------------------------------------
// Federated Log
// -------------------------------------------------------------------------
/**
* The federated log stores a map with the federated log index and its
* corresponding local index . For the master both are the same
*/
std : : set < int > fed_log ;
/**
* Generates the federated index , it should be called whenever a server
* takes leadership .
*/
void build_federated_index ( ) ;
2017-04-20 17:13:41 +03:00
// -------------------------------------------------------------------------
// DataBase implementation
// -------------------------------------------------------------------------
static const char * table ;
static const char * db_names ;
static const char * db_bootstrap ;
2017-06-29 20:47:56 +03:00
/**
* Replicates writes in the followers and apply changes to DB state once
* it is safe to do so .
*
* @ param federated - 1 not federated ( fed_index = - 1 ) , 0 generate fed index
* ( fed_index = index ) , > 0 set ( fed_index = federated )
*/
int _exec_wr ( ostringstream & cmd , int federated ) ;
/**
* Callback to store the IDs of federated records in the federated log .
*/
int index_cb ( void * null , int num , char * * values , char * * names ) ;
2017-06-01 01:52:20 +03:00
/**
* Applies the SQL command of the given record to the database . The
* timestamp of the record is updated .
* @ param lr the log record
*/
int apply_log_record ( LogDBRecord * lr ) ;
2017-04-20 17:13:41 +03:00
/**
* Inserts or update a log record in the database
2017-04-21 23:32:30 +03:00
* @ param index of the log entry
* @ param term for the log entry
* @ param sql command to modify DB state
2017-05-05 16:46:57 +03:00
* @ param ts timestamp of record application to DB state
2017-06-29 20:47:56 +03:00
* @ param fi the federated index - 1 if none
2017-04-20 17:13:41 +03:00
*
* @ return 0 on success
*/
2017-06-29 20:47:56 +03:00
int insert ( int index , int term , const std : : string & sql , time_t ts , int fi ) ;
2017-04-28 20:35:57 +03:00
/**
* Inserts a new log record in the database . If the record is successfully
* inserted the index is incremented
* @ param term for the record
* @ param sql command of the record
* @ param timestamp associated to this record
2017-06-29 20:47:56 +03:00
* @ param federated , if true it will set fed_index = = index , - 1 otherwise
2017-04-28 20:35:57 +03:00
*
* @ return - 1 on failure , index of the inserted record on success
*/
int insert_log_record ( unsigned int term , std : : ostringstream & sql ,
2017-06-29 20:47:56 +03:00
time_t timestamp , int federated ) ;
2017-04-19 21:44:31 +03:00
} ;
2017-05-16 13:01:06 +03:00
// -----------------------------------------------------------------------------
// This is a LogDB decoration, it replicates the DB write commands on slaves
// It should be passed as DB for federated pools.
// -----------------------------------------------------------------------------
class FedLogDB : public SqlDB
{
public :
FedLogDB ( LogDB * db ) : _logdb ( db ) { } ;
virtual ~ FedLogDB ( ) { } ;
int exec_wr ( ostringstream & cmd ) ;
int exec_local_wr ( ostringstream & cmd )
{
return _logdb - > exec_local_wr ( cmd ) ;
}
int exec_rd ( ostringstream & cmd , Callbackable * obj )
{
return _logdb - > exec_rd ( cmd , obj ) ;
}
char * escape_str ( const string & str )
{
return _logdb - > escape_str ( str ) ;
}
void free_str ( char * str )
{
_logdb - > free_str ( str ) ;
}
bool multiple_values_support ( )
{
return _logdb - > multiple_values_support ( ) ;
}
protected :
int exec ( std : : ostringstream & cmd , Callbackable * obj , bool quiet )
{
return - 1 ;
}
private :
LogDB * _logdb ;
2017-05-16 13:21:55 +03:00
} ;
2017-05-16 13:01:06 +03:00
2017-04-19 21:44:31 +03:00
# endif /*LOG_DB_H_*/