1
0
mirror of https://github.com/OpenNebula/one.git synced 2025-02-04 17:47:00 +03:00

F #4809: Re-design replicated log structure

This commit is contained in:
Ruben S. Montero 2017-06-29 19:47:56 +02:00
parent 35a67abf3b
commit 87b5e5cb5b
8 changed files with 287 additions and 352 deletions

View File

@ -26,38 +26,37 @@
extern "C" void * frm_loop(void *arg);
class SqlDB;
class LogDB;
class LogDBRecord;
class FedReplicaManager : public ReplicaManager, ActionListener
{
public:
/**
* @param _t timer for periofic actions (sec)
* @param _p purge timeout for log
* @param d pointer to underlying DB (LogDB)
* @param l log_retention length (num records)
*/
FedReplicaManager(time_t _t, time_t _p, SqlDB * d, unsigned int l);
FedReplicaManager(LogDB * d);
virtual ~FedReplicaManager();
/**
* Creates a new record in the federation log and sends the replication
* event to the replica threads. [MASTER]
* @param sql db command to replicate
* @return 0 on success -1 otherwise
* Sends the replication event to the replica threads. [MASTER]
*/
int replicate(const std::string& sql);
void replicate(const std::string& sql)
{
ReplicaManager::replicate();
}
/**
* Updates the current index in the server and applies the command to the
* server. It also stores the record in the zone log [SLAVE]
* @param index of the record
* @param prev of index preceding this one
* @param sql command to apply to DB
* @return 0 on success, last_index if missing records, -1 on DB error
*/
int apply_log_record(int index, const std::string& sql);
int apply_log_record(int index, int prev, const std::string& sql);
/**
* Record was successfully replicated on zone, increase next index and
@ -106,8 +105,6 @@ public:
update_zones(zids);
get_last_index(last_index);
ReplicaManager::start_replica_threads(zids);
}
@ -133,11 +130,6 @@ public:
*/
void delete_zone(int zone_id);
/**
* Bootstrap federated log
*/
static int bootstrap(SqlDB *_db);
/**
* @return the id of fed. replica thread
*/
@ -146,19 +138,6 @@ public:
return frm_thread;
};
/**
* @return the last index of the fed log (from DB to use this method in
* HA followers)
*/
unsigned int get_last_index() const
{
unsigned int li;
get_last_index(li);
return li;
}
private:
friend void * frm_loop(void *arg);
@ -177,29 +156,19 @@ private:
*/
pthread_mutex_t mutex;
//--------------------------------------------------------------------------
// Timers
// - timer_period. Base timer to wake up the manager
// - purge_period. How often the replicated log is purged (600s)
// - xmlrpc_timeout. To timeout xml-rpc api calls to replicate log
//--------------------------------------------------------------------------
time_t timer_period;
time_t purge_period;
static const time_t xmlrpc_timeout_ms;
// -------------------------------------------------------------------------
// Synchronization variables
// - last_index in the replication log
// - xmlrpc_timeout. To timeout xml-rpc api calls to replicate log
// - zones list of zones in the federation with:
// - list of servers <id, xmlrpc endpoint>
// - next index to send to this zone
// -------------------------------------------------------------------------
static const time_t xmlrpc_timeout_ms;
struct ZoneServers
{
ZoneServers(int z, unsigned int l, const std::string& s):
zone_id(z), endpoint(s), next(l){};
zone_id(z), endpoint(s), next(l), last(-1){};
~ZoneServers(){};
@ -207,16 +176,14 @@ private:
std::string endpoint;
unsigned int next;
int next;
int last;
};
std::map<int, ZoneServers *> zones;
unsigned int last_index;
SqlDB * logdb;
unsigned int log_retention;
LogDB * logdb;
// -------------------------------------------------------------------------
// Action Listener interface
@ -228,45 +195,6 @@ private:
*/
void finalize_action(const ActionRequest& ar);
/**
* This function is executed periodically to purge the state log
*/
void timer_action(const ActionRequest& ar);
// -------------------------------------------------------------------------
// Database Implementation
// Store log records to replicate on federation slaves
// -------------------------------------------------------------------------
static const char * table;
static const char * db_names;
static const char * db_bootstrap;
/**
* Gets a record from the log
* @param index of the record
* @param sql command of the record
* @return 0 in case of success -1 otherwise
*/
int get_log_record(int index, std::string& sql);
/**
* Inserts a new record in the log ans updates the last_index variable
* (memory and db)
* @param index of new record
* @param sql of DB command to execute
* @return 0 on success
*/
int insert_log_record(int index, const std::string& sql);
/**
* Reads the last index from DB for initialization
* @param index
* @return 0 on success
*/
int get_last_index(unsigned int& index) const;
/**
* Get the nest record to replicate in a zone
* @param zone_id of the zone
@ -274,8 +202,8 @@ private:
* @param sql command to replicate
* @return 0 on success, -1 otherwise
*/
int get_next_record(int zone_id, int& index, std::string& sql,
std::string& zservers);
int get_next_record(int zone_id, std::string& zedp, LogDBRecord& lr);
};
#endif /*FED_REPLICA_MANAGER_H_*/

View File

@ -19,6 +19,7 @@
#include <string>
#include <sstream>
#include <set>
#include "SqlDB.h"
@ -52,6 +53,12 @@ public:
*/
time_t timestamp;
/**
* The index in the federation, -1 if the log entry is not federated.
* At master fed_index is equal to index.
*/
int fed_index;
/**
* Sets callback to load register from DB
*/
@ -72,7 +79,7 @@ private:
* This class implements a generic DB interface with replication. The associated
* DB stores a log to replicate on followers.
*/
class LogDB : public SqlDB
class LogDB : public SqlDB, Callbackable
{
public:
LogDB(SqlDB * _db, bool solo, unsigned int log_retention);
@ -111,11 +118,12 @@ public:
* @param term for the record
* @param sql command of the record
* @param timestamp associated to this record
* @param fed_index index in the federation -1 if not federated
*
* @return -1 on failure, index of the inserted record on success
*/
int insert_log_record(unsigned int index, unsigned int term,
std::ostringstream& sql, time_t timestamp);
std::ostringstream& sql, time_t timestamp, int fed_index);
//--------------------------------------------------------------------------
// Functions to manage the Raft state. Log record 0, term -1
@ -148,7 +156,20 @@ public:
* This function replicates the DB changes on followers before updating
* the DB state
*/
int exec_wr(ostringstream& cmd);
int exec_wr(ostringstream& cmd)
{
return _exec_wr(cmd, -1);
}
int exec_federated_wr(ostringstream& cmd)
{
return _exec_wr(cmd, 0);
}
int exec_federated_wr(ostringstream& cmd, int index)
{
return _exec_wr(cmd, index);
}
int exec_local_wr(ostringstream& cmd)
{
@ -201,6 +222,18 @@ public:
*/
void get_last_record_index(unsigned int& _i, unsigned int& _t);
// -------------------------------------------------------------------------
// Federate log methods
// -------------------------------------------------------------------------
/**
* Get last federated index, and previous
*/
int last_federated();
int previous_federated(int index);
int next_federated(int index);
protected:
int exec(std::ostringstream& cmd, Callbackable* obj, bool quiet)
{
@ -245,6 +278,21 @@ private:
*/
unsigned int log_retention;
// -------------------------------------------------------------------------
// Federated Log
// -------------------------------------------------------------------------
/**
* The federated log stores a map with the federated log index and its
* corresponding local index. For the master both are the same
*/
std::set<int> fed_log;
/**
* Generates the federated index, it should be called whenever a server
* takes leadership.
*/
void build_federated_index();
// -------------------------------------------------------------------------
// DataBase implementation
// -------------------------------------------------------------------------
@ -254,6 +302,20 @@ private:
static const char * db_bootstrap;
/**
* Replicates writes in the followers and apply changes to DB state once
* it is safe to do so.
*
* @param federated -1 not federated (fed_index = -1), 0 generate fed index
* (fed_index = index), > 0 set (fed_index = federated)
*/
int _exec_wr(ostringstream& cmd, int federated);
/**
* Callback to store the IDs of federated records in the federated log.
*/
int index_cb(void *null, int num, char **values, char **names);
/**
* Applies the SQL command of the given record to the database. The
* timestamp of the record is updated.
@ -267,10 +329,11 @@ private:
* @param term for the log entry
* @param sql command to modify DB state
* @param ts timestamp of record application to DB state
* @param fi the federated index -1 if none
*
* @return 0 on success
*/
int insert(int index, int term, const std::string& sql, time_t ts);
int insert(int index, int term, const std::string& sql, time_t ts, int fi);
/**
* Inserts a new log record in the database. If the record is successfully
@ -278,11 +341,12 @@ private:
* @param term for the record
* @param sql command of the record
* @param timestamp associated to this record
* @param federated, if true it will set fed_index == index, -1 otherwise
*
* @return -1 on failure, index of the inserted record on success
*/
int insert_log_record(unsigned int term, std::ostringstream& sql,
time_t timestamp);
time_t timestamp, int federated);
};
// -----------------------------------------------------------------------------

View File

@ -386,7 +386,6 @@ void Nebula::start(bool bootstrap_only)
rc += SecurityGroupPool::bootstrap(logdb);
rc += VirtualRouterPool::bootstrap(logdb);
rc += VMGroupPool::bootstrap(logdb);
rc += FedReplicaManager::bootstrap(logdb);
// Create the system tables only if bootstrap went well
if (rc == 0)
@ -743,7 +742,7 @@ void Nebula::start(bool bootstrap_only)
// ---- FedReplica Manager ----
try
{
frm = new FedReplicaManager(timer_period,log_purge,logdb,log_retention);
frm = new FedReplicaManager(logdb);
}
catch (bad_alloc&)
{

View File

@ -22,27 +22,16 @@
/* -------------------------------------------------------------------------- */
/* -------------------------------------------------------------------------- */
const char * FedReplicaManager::table = "fed_logdb";
const char * FedReplicaManager::db_names = "log_index, sqlcmd";
const char * FedReplicaManager::db_bootstrap = "CREATE TABLE IF NOT EXISTS "
"fed_logdb (log_index INTEGER PRIMARY KEY, sqlcmd MEDIUMTEXT)";
const time_t FedReplicaManager::xmlrpc_timeout_ms = 10000;
/* -------------------------------------------------------------------------- */
/* -------------------------------------------------------------------------- */
FedReplicaManager::FedReplicaManager(time_t _t, time_t _p, SqlDB * d,
unsigned int l): ReplicaManager(), timer_period(_t), purge_period(_p),
last_index(-1), logdb(d), log_retention(l)
FedReplicaManager::FedReplicaManager(LogDB * d): ReplicaManager(), logdb(d)
{
pthread_mutex_init(&mutex, 0);
am.addListener(this);
get_last_index(last_index);
};
/* -------------------------------------------------------------------------- */
@ -69,35 +58,16 @@ FedReplicaManager::~FedReplicaManager()
/* -------------------------------------------------------------------------- */
/* -------------------------------------------------------------------------- */
int FedReplicaManager::replicate(const std::string& sql)
{
pthread_mutex_lock(&mutex);
if ( insert_log_record(last_index+1, sql) != 0 )
{
pthread_mutex_unlock(&mutex);
return -1;
}
last_index++;
pthread_mutex_unlock(&mutex);
ReplicaManager::replicate();
return 0;
}
/* -------------------------------------------------------------------------- */
/* -------------------------------------------------------------------------- */
int FedReplicaManager::apply_log_record(int index, const std::string& sql)
int FedReplicaManager::apply_log_record(int index, int prev,
const std::string& sql)
{
int rc;
pthread_mutex_lock(&mutex);
if ( (unsigned int) index != last_index + 1 )
int last_index = logdb->last_federated();
if ( prev != last_index )
{
rc = last_index;
@ -105,40 +75,14 @@ int FedReplicaManager::apply_log_record(int index, const std::string& sql)
return rc;
}
std::ostringstream oss;
std::ostringstream oss(sql);
std::string * zsql = one_util::zlib_compress(sql, true);
if ( zsql == 0 )
if ( logdb->exec_federated_wr(oss, index) != 0 )
{
pthread_mutex_unlock(&mutex);
return -1;
}
char * sql_db = logdb->escape_str(zsql->c_str());
delete zsql;
if ( sql_db == 0 )
{
pthread_mutex_unlock(&mutex);
return -1;
}
oss << "BEGIN;\n"
<< "REPLACE INTO " << table << " ("<< db_names <<") VALUES "
<< "(" << last_index + 1 << ",'" << sql_db << "');\n"
<< sql << ";\n"
<< "END;";
if ( logdb->exec_wr(oss) != 0 )
{
pthread_mutex_unlock(&mutex);
return -1;
}
last_index++;
pthread_mutex_unlock(&mutex);
return 0;
@ -160,7 +104,7 @@ extern "C" void * frm_loop(void *arg)
NebulaLog::log("FRM",Log::INFO,"Federation Replica Manger started.");
fedrm->am.loop(fedrm->timer_period);
fedrm->am.loop();
NebulaLog::log("FRM",Log::INFO,"Federation Replica Manger stopped.");
@ -210,6 +154,8 @@ void FedReplicaManager::update_zones(std::vector<int>& zone_ids)
pthread_mutex_lock(&mutex);
int last_index = logdb->last_federated();
zones.clear();
for (it = zone_ids.begin() ; it != zone_ids.end(); )
@ -271,6 +217,8 @@ void FedReplicaManager::add_zone(int zone_id)
pthread_mutex_lock(&mutex);
int last_index = logdb->last_federated();
ZoneServers * zs = new ZoneServers(zone_id, last_index, zedp);
zones.insert(make_pair(zone_id, zs));
@ -325,58 +273,8 @@ ReplicaThread * FedReplicaManager::thread_factory(int zone_id)
/* -------------------------------------------------------------------------- */
/* -------------------------------------------------------------------------- */
void FedReplicaManager::timer_action(const ActionRequest& ar)
{
static int mark_tics = 0;
static int purge_tics = 0;
mark_tics++;
purge_tics++;
// Thread heartbeat
if ( (mark_tics * timer_period) >= 600 )
{
NebulaLog::log("FRM",Log::INFO,"--Mark--");
mark_tics = 0;
}
// Database housekeeping
if ( (purge_tics * timer_period) >= purge_period )
{
Nebula& nd = Nebula::instance();
RaftManager * raftm = nd.get_raftm();
if ( raftm->is_leader() || raftm->is_solo() )
{
NebulaLog::log("FRM", Log::INFO, "Purging federated log");
std::ostringstream oss;
pthread_mutex_lock(&mutex);
if ( last_index > log_retention )
{
unsigned int delete_index = last_index - log_retention;
// keep the last "log_retention" records
oss << "DELETE FROM fed_logdb WHERE log_index >= 0 AND "
<< "log_index < " << delete_index;
logdb->exec_wr(oss);
}
pthread_mutex_unlock(&mutex);
}
purge_tics = 0;
}
}
/* -------------------------------------------------------------------------- */
/* -------------------------------------------------------------------------- */
int FedReplicaManager::get_next_record(int zone_id, int& index,
std::string& sql, std::string& zedp)
int FedReplicaManager::get_next_record(int zone_id, std::string& zedp,
LogDBRecord& lr)
{
pthread_mutex_lock(&mutex);
@ -388,114 +286,28 @@ int FedReplicaManager::get_next_record(int zone_id, int& index,
return -1;
}
index = it->second->next;
zedp = it->second->endpoint;
ZoneServers * zs = it->second;
int rc = get_log_record(index, sql);
zedp = zs->endpoint;
if ( zs->next == -1 )
{
zs->next= logdb->last_federated();
}
if ( zs->last == zs->next )
{
pthread_mutex_unlock(&mutex);
return -1;
}
int rc = logdb->get_log_record(zs->next, lr);
pthread_mutex_unlock(&mutex);
return rc;
}
/* -------------------------------------------------------------------------- */
int FedReplicaManager::get_log_record(int index, std::string& sql)
{
std::string zsql;
ostringstream oss;
single_cb<std::string> cb;
oss << "SELECT sqlcmd FROM fed_logdb WHERE log_index = " << index;
cb.set_callback(&zsql);
int rc = logdb->exec_rd(oss, &cb);
cb.unset_callback();
std::string * _sql = one_util::zlib_decompress(zsql, true);
if ( _sql == 0 )
{
return -1;
}
sql = *_sql;
delete _sql;
return rc;
}
/* -------------------------------------------------------------------------- */
int FedReplicaManager::insert_log_record(int index, const std::string& sql)
{
std::ostringstream oss;
std::string * zsql = one_util::zlib_compress(sql, true);
if ( zsql == 0 )
{
return -1;
}
char * sql_db = logdb->escape_str(zsql->c_str());
delete zsql;
if ( sql_db == 0 )
{
return -1;
}
oss << "REPLACE INTO " << table << " ("<< db_names <<") VALUES "
<< "(" << index << ",'" << sql_db << "')";
return logdb->exec_wr(oss);
}
/* -------------------------------------------------------------------------- */
int FedReplicaManager::get_last_index(unsigned int& index) const
{
ostringstream oss;
single_cb<unsigned int> cb;
oss << "SELECT MAX(log_index) FROM fed_logdb";
cb.set_callback(&index);
int rc = logdb->exec_rd(oss, &cb);
cb.unset_callback();
return rc;
}
/* -------------------------------------------------------------------------- */
int FedReplicaManager::bootstrap(SqlDB *_db)
{
int rc;
std::ostringstream oss(db_bootstrap);
rc = _db->exec_local_wr(oss);
oss.str("");
oss << "REPLACE INTO " << table << " ("<< db_names <<") VALUES (-1,-1)";
rc += _db->exec_local_wr(oss);
return rc;
}
/* -------------------------------------------------------------------------- */
/* -------------------------------------------------------------------------- */
@ -513,9 +325,11 @@ void FedReplicaManager::replicate_success(int zone_id)
ZoneServers * zs = it->second;
zs->next++;
zs->last = zs->next;
if ( last_index >= zs->next )
zs->next = logdb->next_federated(zs->next);
if ( zs->next != -1 )
{
ReplicaManager::replicate(zone_id);
}
@ -537,10 +351,12 @@ void FedReplicaManager::replicate_failure(int zone_id, int last_zone)
if ( last_zone >= 0 )
{
zs->next = last_zone + 1;
zs->last = last_zone;
zs->next = logdb->next_federated(zs->last);
}
if ( last_index >= zs->next )
if ( zs->next != -1 )
{
ReplicaManager::replicate(zone_id);
}
@ -558,18 +374,20 @@ int FedReplicaManager::xmlrpc_replicate_log(int zone_id, bool& success,
{
static const std::string replica_method = "one.zone.fedreplicate";
int index;
std::string sql, secret, zedp;
std::string secret, zedp;
int xml_rc = 0;
if ( get_next_record(zone_id, index, sql, zedp) != 0 )
LogDBRecord lr;
if ( get_next_record(zone_id, zedp, lr) != 0 )
{
error = "Failed to load federation log record";
return -1;
}
int prev_index = logdb->previous_federated(lr.index);
// -------------------------------------------------------------------------
// Get parameters to call append entries on follower
// -------------------------------------------------------------------------
@ -582,8 +400,9 @@ int FedReplicaManager::xmlrpc_replicate_log(int zone_id, bool& success,
xmlrpc_c::paramList replica_params;
replica_params.add(xmlrpc_c::value_string(secret));
replica_params.add(xmlrpc_c::value_int(index));
replica_params.add(xmlrpc_c::value_string(sql));
replica_params.add(xmlrpc_c::value_int(lr.index));
replica_params.add(xmlrpc_c::value_int(prev_index));
replica_params.add(xmlrpc_c::value_string(lr.sql));
// -------------------------------------------------------------------------
// Do the XML-RPC call
@ -612,7 +431,7 @@ int FedReplicaManager::xmlrpc_replicate_log(int zone_id, bool& success,
{
std::ostringstream ess;
ess << "Error replicating log entry " << index << " on zone "
ess << "Error replicating log entry " << lr.index << " on zone "
<< zone_id << " (" << zedp << "): " << error;
NebulaLog::log("FRM", Log::ERROR, error);

View File

@ -74,7 +74,7 @@ RaftManager::RaftManager(int id, const VectorAttribute * leader_hook_mad,
bsr << "bootstrap state";
logdb->insert_log_record(-1, -1, bsr, 0);
logdb->insert_log_record(-1, -1, bsr, 0, -1);
raft_state.replace("TERM", 0);
raft_state.replace("VOTEDFOR", -1);
@ -1038,6 +1038,7 @@ int RaftManager::xmlrpc_replicate_log(int follower_id, LogDBRecord * lr,
replica_params.add(xmlrpc_c::value_int(lr->term));
replica_params.add(xmlrpc_c::value_int(lr->prev_index));
replica_params.add(xmlrpc_c::value_int(lr->prev_term));
replica_params.add(xmlrpc_c::value_int(lr->fed_index));
replica_params.add(xmlrpc_c::value_string(lr->sql));
// -------------------------------------------------------------------------
@ -1176,8 +1177,6 @@ std::string& RaftManager::to_xml(std::string& raft_xml)
Nebula& nd = Nebula::instance();
LogDB * logdb = nd.get_logdb();
FedReplicaManager * frm = nd.get_frm();
unsigned int lindex, lterm;
std::ostringstream oss;
@ -1206,7 +1205,7 @@ std::string& RaftManager::to_xml(std::string& raft_xml)
if ( nd.is_federation_enabled() )
{
oss << "<FEDLOG_INDEX>" << frm->get_last_index() << "</FEDLOG_INDEX>";
oss << "<FEDLOG_INDEX>" << logdb->last_federated() << "</FEDLOG_INDEX>";
}
else
{

View File

@ -304,6 +304,7 @@ int HeartBeatThread::replicate()
lr.sql = "";
lr.timestamp = 0;
lr.fed_index = -1;
rc = raftm->xmlrpc_replicate_log(follower_id, &lr, success, fterm, error);

View File

@ -273,8 +273,9 @@ void ZoneReplicateLog::request_execute(xmlrpc_c::paramList const& paramList,
unsigned int term = xmlrpc_c::value_int(paramList.getInt(5));
unsigned int prev_index = xmlrpc_c::value_int(paramList.getInt(6));
unsigned int prev_term = xmlrpc_c::value_int(paramList.getInt(7));
unsigned int fed_index = xmlrpc_c::value_int(paramList.getInt(8));
string sql = xmlrpc_c::value_string(paramList.getString(8));
string sql = xmlrpc_c::value_string(paramList.getString(9));
unsigned int current_term = raftm->get_term();
@ -392,7 +393,7 @@ void ZoneReplicateLog::request_execute(xmlrpc_c::paramList const& paramList,
ostringstream sql_oss(sql);
if ( logdb->insert_log_record(index, term, sql_oss, 0) != 0 )
if ( logdb->insert_log_record(index, term, sql_oss, 0, fed_index) != 0 )
{
att.resp_msg = "Error writing log record";
att.resp_id = current_term;
@ -518,7 +519,8 @@ void ZoneReplicateFedLog::request_execute(xmlrpc_c::paramList const& paramList,
FedReplicaManager * frm = nd.get_frm();
int index = xmlrpc_c::value_int(paramList.getInt(1));
string sql = xmlrpc_c::value_string(paramList.getString(2));
int prev = xmlrpc_c::value_int(paramList.getInt(2));
string sql = xmlrpc_c::value_string(paramList.getString(3));
if ( att.uid != 0 )
{
@ -554,7 +556,7 @@ void ZoneReplicateFedLog::request_execute(xmlrpc_c::paramList const& paramList,
return;
}
int rc = frm->apply_log_record(index, sql);
int rc = frm->apply_log_record(index, prev, sql);
if ( rc == 0 )
{

View File

@ -25,11 +25,11 @@
const char * LogDB::table = "logdb";
const char * LogDB::db_names = "log_index, term, sqlcmd, timestamp";
const char * LogDB::db_names = "log_index, term, sqlcmd, timestamp, fed_index";
const char * LogDB::db_bootstrap = "CREATE TABLE IF NOT EXISTS "
"logdb (log_index INTEGER PRIMARY KEY, term INTEGER, sqlcmd MEDIUMTEXT, "
"timestamp INTEGER)";
"timestamp INTEGER, fed_index INTEGER)";
/* -------------------------------------------------------------------------- */
/* -------------------------------------------------------------------------- */
@ -37,7 +37,7 @@ const char * LogDB::db_bootstrap = "CREATE TABLE IF NOT EXISTS "
int LogDBRecord::select_cb(void *nil, int num, char **values, char **names)
{
if ( !values || !values[0] || !values[1] || !values[2] || !values[3] ||
!values[4] || !values[5] || num != 6 )
!values[4] || !values[5] || !values[6] || num != 7 )
{
return -1;
}
@ -52,8 +52,10 @@ int LogDBRecord::select_cb(void *nil, int num, char **values, char **names)
timestamp = static_cast<unsigned int>(atoi(values[3]));
prev_index = static_cast<unsigned int>(atoi(values[4]));
prev_term = static_cast<unsigned int>(atoi(values[5]));
fed_index = static_cast<unsigned int>(atoi(values[4]));
prev_index = static_cast<unsigned int>(atoi(values[5]));
prev_term = static_cast<unsigned int>(atoi(values[6]));
_sql = one_util::zlib_decompress(zsql, true);
@ -88,7 +90,7 @@ LogDB::LogDB(SqlDB * _db, bool _solo, unsigned int _lret):solo(_solo), db(_db),
oss << time(0);
insert_log_record(0, 0, oss, time(0));
insert_log_record(0, 0, oss, time(0), false);
}
setup_index(r, i);
@ -153,6 +155,8 @@ int LogDB::setup_index(int& _last_applied, int& _last_index)
last_term = lr.term;
}
build_federated_index();
pthread_mutex_unlock(&mutex);
return rc;
@ -175,7 +179,7 @@ int LogDB::get_log_record(unsigned int index, LogDBRecord& lr)
lr.index = index + 1;
oss << "SELECT c.log_index, c.term, c.sqlcmd,"
<< " c.timestamp, p.log_index, p.term"
<< " c.timestamp, c.fed_index, p.log_index, p.term"
<< " FROM logdb c, logdb p WHERE c.log_index = " << index
<< " AND p.log_index = " << prev_index;
@ -255,7 +259,8 @@ int LogDB::update_raft_state(std::string& raft_xml)
/* -------------------------------------------------------------------------- */
/* -------------------------------------------------------------------------- */
int LogDB::insert(int index, int term, const std::string& sql, time_t tstamp)
int LogDB::insert(int index, int term, const std::string& sql, time_t tstamp,
int fed_index)
{
std::ostringstream oss;
@ -278,7 +283,8 @@ int LogDB::insert(int index, int term, const std::string& sql, time_t tstamp)
}
oss << "INSERT INTO " << table << " ("<< db_names <<") VALUES ("
<< index << "," << term << "," << "'" << sql_db << "'," << tstamp<< ")";
<< index << "," << term << "," << "'" << sql_db << "'," << tstamp
<< "," << fed_index << ")";
int rc = db->exec_wr(oss);
@ -336,13 +342,24 @@ int LogDB::apply_log_record(LogDBRecord * lr)
/* -------------------------------------------------------------------------- */
int LogDB::insert_log_record(unsigned int term, std::ostringstream& sql,
time_t timestamp)
time_t timestamp, int fed_index)
{
pthread_mutex_lock(&mutex);
unsigned int index = next_index;
if ( insert(index, term, sql.str(), timestamp) != 0 )
int _fed_index;
if ( fed_index == 0 )
{
_fed_index = index;
}
else
{
_fed_index = fed_index;
}
if ( insert(index, term, sql.str(), timestamp, _fed_index) != 0 )
{
NebulaLog::log("DBM", Log::ERROR, "Cannot insert log record in DB");
@ -357,6 +374,11 @@ int LogDB::insert_log_record(unsigned int term, std::ostringstream& sql,
next_index++;
if ( fed_index != -1 )
{
fed_log.insert(_fed_index);
}
pthread_mutex_unlock(&mutex);
return index;
@ -366,13 +388,13 @@ int LogDB::insert_log_record(unsigned int term, std::ostringstream& sql,
/* -------------------------------------------------------------------------- */
int LogDB::insert_log_record(unsigned int index, unsigned int term,
std::ostringstream& sql, time_t timestamp)
std::ostringstream& sql, time_t timestamp, int fed_index)
{
int rc;
pthread_mutex_lock(&mutex);
rc = insert(index, term, sql.str(), timestamp);
rc = insert(index, term, sql.str(), timestamp, fed_index);
if ( rc == 0 )
{
@ -384,6 +406,11 @@ int LogDB::insert_log_record(unsigned int index, unsigned int term,
next_index = last_index + 1;
}
if ( fed_index != -1 )
{
fed_log.insert(fed_index);
}
}
pthread_mutex_unlock(&mutex);
@ -394,7 +421,7 @@ int LogDB::insert_log_record(unsigned int index, unsigned int term,
/* -------------------------------------------------------------------------- */
/* -------------------------------------------------------------------------- */
int LogDB::exec_wr(ostringstream& cmd)
int LogDB::_exec_wr(ostringstream& cmd, int federated_index)
{
int rc;
@ -416,7 +443,7 @@ int LogDB::exec_wr(ostringstream& cmd)
// -------------------------------------------------------------------------
// Insert log entry in the database and replicate on followers
// -------------------------------------------------------------------------
int rindex = insert_log_record(raftm->get_term(), cmd, 0);
int rindex = insert_log_record(raftm->get_term(), cmd, 0, federated_index);
if ( rindex == -1 )
{
@ -544,6 +571,102 @@ int LogDB::purge_log()
return rc;
}
/* -------------------------------------------------------------------------- */
/* -------------------------------------------------------------------------- */
int LogDB::index_cb(void *null, int num, char **values, char **names)
{
if ( num == 0 || values == 0 || values[0] == 0 )
{
return -1;
}
fed_log.insert(atoi(values[0]));
return 0;
}
void LogDB::build_federated_index()
{
std::ostringstream oss;
fed_log.clear();
set_callback(static_cast<Callbackable::Callback>(&LogDB::index_cb), 0);
oss << "SELECT fed_index FROM " << table << " WHERE fed_index != -1 ";
db->exec_rd(oss, this);
unset_callback();
}
/* -------------------------------------------------------------------------- */
/* -------------------------------------------------------------------------- */
int LogDB::last_federated()
{
pthread_mutex_lock(&mutex);
int findex = -1;
if ( !fed_log.empty() )
{
set<int>::reverse_iterator rit;
rit = fed_log.rbegin();
findex = *rit;
}
pthread_mutex_unlock(&mutex);
return findex;
}
/* -------------------------------------------------------------------------- */
int LogDB::previous_federated(int i)
{
set<int>::iterator it;
pthread_mutex_lock(&mutex);
int findex = -1;
it = fed_log.find(i);
if ( it != fed_log.end() && it != fed_log.begin() )
{
findex = *(--it);
}
pthread_mutex_unlock(&mutex);
return findex;
}
/* -------------------------------------------------------------------------- */
int LogDB::next_federated(int i)
{
set<int>::iterator it;
pthread_mutex_lock(&mutex);
int findex = -1;
it = fed_log.find(i);
if ( it != fed_log.end() && it != --fed_log.end() )
{
findex = *(++it);
}
pthread_mutex_unlock(&mutex);
return findex;
}
/* -------------------------------------------------------------------------- */
/* -------------------------------------------------------------------------- */
@ -551,7 +674,7 @@ int FedLogDB::exec_wr(ostringstream& cmd)
{
FedReplicaManager * frm = Nebula::instance().get_frm();
int rc = _logdb->exec_wr(cmd);
int rc = _logdb->exec_federated_wr(cmd);
if ( rc != 0 )
{