1
0
mirror of https://github.com/OpenNebula/one.git synced 2025-02-26 09:57:23 +03:00

F #4809: Fix several issues accessing the log records

This commit is contained in:
Ruben S. Montero 2017-05-04 22:56:07 +02:00
parent c6a7500df5
commit 03c5698a72
7 changed files with 181 additions and 172 deletions

View File

@ -18,6 +18,7 @@
#define CALLBACKABLE_H_
#include <pthread.h>
#include <sstream>
using namespace std;
@ -105,4 +106,39 @@ private:
pthread_mutex_t mutex;
};
/* -------------------------------------------------------------------------- */
/* Classes to obtain values from a DB it support concurrent queries using */
/* different objects */
/* -------------------------------------------------------------------------- */
template <class T>
class single_cb : public Callbackable
{
public:
void set_callback(T * _value)
{
value = _value;
Callbackable::set_callback(
static_cast<Callbackable::Callback>(&single_cb::callback));
}
virtual int callback(void *nil, int num, char **values, char **names)
{
if ( values == 0 || values[0] == 0 || num != 1 )
{
return -1;
}
std::istringstream iss(values[0]);
iss >> *value;
return 0;
}
private:
T * value;
};
#endif /*CALLBACKABLE_H_*/

View File

@ -25,8 +25,9 @@
/**
* This class represents a log record
*/
struct LogDBRecord
class LogDBRecord : public Callbackable
{
public:
/**
* Index for this log entry (and previous)
*/
@ -50,13 +51,46 @@ struct LogDBRecord
* Time when the record has been applied to DB. 0 if not applied
*/
time_t timestamp;
/**
* Sets callback to load register from DB
*/
void set_callback()
{
Callbackable::set_callback(
static_cast<Callbackable::Callback>(&LogDBRecord::select_cb));
}
private:
/**
* SQL callback to load logDBRecord from DB (SELECT commands)
*/
int select_cb(void *nil, int num, char **values, char **names)
{
if ( !values || !values[0] || !values[1] || !values[2] || !values[3] ||
!values[4] || !values[5] || num != 6 )
{
return -1;
}
index = static_cast<unsigned int>(atoi(values[0]));
term = static_cast<unsigned int>(atoi(values[1]));
sql = values[2];
timestamp = static_cast<unsigned int>(atoi(values[3]));
prev_index = static_cast<unsigned int>(atoi(values[4]));
prev_term = static_cast<unsigned int>(atoi(values[5]));
return 0;
}
};
/**
* This class implements a generic DB interface with replication. The associated
* DB stores a log to replicate on followers.
*/
class LogDB : public SqlDB, Callbackable
class LogDB : public SqlDB
{
public:
LogDB(SqlDB * _db, bool solo, const std::string& log_retention);
@ -70,9 +104,10 @@ public:
* Loads a log record from the database. Memory is allocated by this class
* and needs to be freed.
* @param index of the associated logDB entry
* @return the LogDB record
* @param lr logDBrecored to load from the DB
* @return 0 on success -1 otherwise
*/
LogDBRecord * get_log_record(unsigned int index);
int get_log_record(unsigned int index, LogDBRecord& lr);
/**
* Applies the SQL command of the given record to the database. The
@ -92,8 +127,7 @@ public:
/**
* Inserts a new log record in the database. This method should be used
* in FOLLOWER mode to replicate leader log. It updates next_index and
* last_term to evaluate vote requests.
* in FOLLOWER mode to replicate leader log. It does not update counters.
* @param index for the record
* @param term for the record
* @param sql command of the record
@ -104,28 +138,12 @@ public:
int insert_log_record(unsigned int index, unsigned int term,
std::ostringstream& sql, time_t timestamp)
{
int rc;
pthread_mutex_lock(&mutex);
rc = insert_replace(index, term, sql.str(), timestamp, false);
if ( rc == 0 && term >= 0 && index >= 0 )
{
next_index = index + 1;
last_term = term;
}
pthread_mutex_unlock(&mutex);
return rc;
return insert_replace(index, term, sql.str(), timestamp, false);
}
//--------------------------------------------------------------------------
// Functions to manage the Raft state. Log record 0, term -1
// -------------------------------------------------------------------------
/**
* Stores the raft state in the log
* @param raft attributes in XML format
@ -206,19 +224,13 @@ public:
int setup_index(int& last_applied, int& last_index);
/**
* Gets the index of the last record in the log
* Gets the index & term of the last record in the log
* @param _i the index
* @param _t the term
*
* @return 0 on success
*/
void get_last_record_index(unsigned int& _i, unsigned int& _t)
{
pthread_mutex_lock(&mutex);
_i = next_index - 1;
_t = last_term;
pthread_mutex_unlock(&mutex);
}
int get_last_record_index(unsigned int& _i, unsigned int& _t);
protected:
int exec(std::ostringstream& cmd, Callbackable* obj, bool quiet)
@ -249,11 +261,6 @@ private:
*/
unsigned int last_applied;
/**
* Term of the last entry added to the log
*/
unsigned int last_term;
/**
* Max number of records to keep in the database
*/
@ -268,21 +275,6 @@ private:
static const char * db_bootstrap;
/**
* Callback to initialize the next_index and last_appled varibales.
*/
int setup_index_cb(void *nil, int num, char **values, char **names);
/**
* SQL callback for log record SELECT commands
*/
int select_cb(void *req, int num, char **values, char **names);
/**
* SQL callback for loading the raft state
*/
int raft_state_cb(void *str_value, int num, char **values, char **names);
/**
* Inserts or update a log record in the database
* @param index of the log entry

View File

@ -710,7 +710,7 @@ void RaftManager::send_heartbeat()
void RaftManager::request_vote()
{
unsigned int lindex, lterm, fterm;
unsigned int lindex, lterm, fterm, _term, _server_id;
std::map<unsigned int, std::string> _servers;
std::map<unsigned int, std::string>::iterator it;
@ -738,6 +738,8 @@ void RaftManager::request_vote()
return;
}
_servers = servers;
term = term + 1;
votedfor = server_id;
@ -748,6 +750,9 @@ void RaftManager::request_vote()
votes2go = num_servers / 2;
_term = term;
_server_id = server_id;
pthread_mutex_unlock(&mutex);
logdb->insert_raft_state(raft_state_xml, true);
@ -756,7 +761,7 @@ void RaftManager::request_vote()
for (it = _servers.begin(); it != _servers.end() ; ++it, oss.str("") )
{
if ( it->first == (unsigned int) server_id )
if ( it->first == _server_id )
{
continue;
}
@ -765,27 +770,33 @@ void RaftManager::request_vote()
if ( rc == -1 )
{
oss << "Error sending vote request to follower " << it->first <<": "
<< error;
NebulaLog::log("RCM", Log::INFO, oss);
NebulaLog::log("RCM", Log::INFO, error);
}
else if ( success == false && fterm > term )
else if ( success == false )
{
oss << "Follower " << it->first << " has a higher term, turning "
<< "into follower";
oss << "Vote not granted from follower " << it->first << ": "
<< error;
NebulaLog::log("RCM", Log::INFO, oss);
NebulaLog::log("RCM", Log::INFO, oss);
follower(fterm);
if ( fterm > _term )
{
oss.str("");
oss << "Follower " << it->first << " is in term " << fterm
<< " current term is "<< _term << ". Turning into follower";
break;
NebulaLog::log("RCM", Log::INFO, oss);
follower(fterm);
break;
}
}
else if ( success == true )
{
granted_votes++;
oss << "Got vote from server: " << it->first << ". Total votes: "
oss << "Got vote from follower " << it->first << ". Total votes: "
<< granted_votes;
NebulaLog::log("RCM", Log::INFO, oss);
@ -811,8 +822,7 @@ void RaftManager::request_vote()
{
state = FOLLOWER;
last_heartbeat.tv_sec = 0;
last_heartbeat.tv_nsec= 0;
clock_gettime(CLOCK_REALTIME, &last_heartbeat);
}
pthread_mutex_unlock(&mutex);

View File

@ -102,13 +102,14 @@ void ReplicaThread::do_replication()
// ---------------------------------------------------------------------
// Get parameters to call append entries on follower
// ---------------------------------------------------------------------
int next_index = raftm->get_next_index(follower_id);
LogDBRecord * lr = logdb->get_log_record(next_index);
LogDBRecord lr;
int next_index = raftm->get_next_index(follower_id);
bool success = false;
unsigned int follower_term = -1;
if ( lr == 0 )
if ( logdb->get_log_record(next_index, lr) != 0 )
{
ostringstream ess;
@ -119,11 +120,9 @@ void ReplicaThread::do_replication()
continue;
}
int xml_rc = raftm->xmlrpc_replicate_log(follower_id, lr, success,
int xml_rc = raftm->xmlrpc_replicate_log(follower_id, &lr, success,
follower_term, error);
delete lr;
if ( xml_rc == -1 )
{
NebulaLog::log("RCM", Log::ERROR, error);

View File

@ -788,6 +788,7 @@ void RequestManager::register_xml_methods()
xmlrpc_c::methodPtr zone_addserver(zone_addserver_pt);
xmlrpc_c::methodPtr zone_delserver(zone_delserver_pt);
xmlrpc_c::methodPtr zone_replicatelog(new ZoneReplicateLog());
xmlrpc_c::methodPtr zone_voterequest(new ZoneVoteRequest());
xmlrpc_c::methodPtr zone_info(new ZoneInfo());
xmlrpc_c::methodPtr zonepool_info(new ZonePoolInfo());
@ -798,6 +799,7 @@ void RequestManager::register_xml_methods()
RequestManagerRegistry.addMethod("one.zone.info", zone_info);
RequestManagerRegistry.addMethod("one.zone.rename", zone_rename);
RequestManagerRegistry.addMethod("one.zone.replicate",zone_replicatelog);
RequestManagerRegistry.addMethod("one.zone.voterequest",zone_voterequest);
RequestManagerRegistry.addMethod("one.zone.addserver", zone_addserver);
RequestManagerRegistry.addMethod("one.zone.delserver", zone_delserver);

View File

@ -139,7 +139,7 @@ void ZoneReplicateLog::request_execute(xmlrpc_c::paramList const& paramList,
unsigned int current_term = raftm->get_term();
LogDBRecord * lr;
LogDBRecord lr, prev_lr;
if ( att.uid != 0 )
{
@ -189,36 +189,28 @@ void ZoneReplicateLog::request_execute(xmlrpc_c::paramList const& paramList,
//REPLICATE
if ( index > 0 )
{
lr = logdb->get_log_record(prev_index);
if ( lr == 0 )
if ( logdb->get_log_record(prev_index, prev_lr) != 0 )
{
att.resp_msg = "Previous log record missing";
att.resp_msg = "Error loading previous log record";
att.resp_id = current_term;
failure_response(ACTION, att);
return;
}
if ( lr->prev_term != prev_term )
if ( prev_lr.term != prev_term )
{
delete lr;
att.resp_msg = "Previous log record missmatch";
att.resp_id = current_term;
failure_response(ACTION, att);
return;
}
delete lr;
}
lr = logdb->get_log_record(index);
if ( lr != 0 )
if ( logdb->get_log_record(index, lr) != 0 )
{
if ( lr->term != term )
if ( lr.term != term )
{
logdb->delete_log_records(index);
}

View File

@ -17,6 +17,7 @@
#include "LogDB.h"
#include "Nebula.h"
#include "ZoneServer.h"
#include "Callbackable.h"
/* -------------------------------------------------------------------------- */
/* -------------------------------------------------------------------------- */
@ -50,34 +51,24 @@ LogDB::~LogDB()
/* -------------------------------------------------------------------------- */
/* -------------------------------------------------------------------------- */
int LogDB::setup_index_cb(void *int_value, int num, char **values, char **names)
{
int * value = static_cast<int *>(int_value);
if ( values[0] != 0 && num == 1 )
{
*value = atoi(values[0]);
}
return 0;
}
int LogDB::setup_index(int& _last_applied, int& _last_index)
{
std::ostringstream oss;
int rc = 0;
std::ostringstream oss;
single_cb<int> cb;
_last_applied = 0;
_last_index = -1;
set_callback(static_cast<Callbackable::Callback>(&LogDB::setup_index_cb),
static_cast<void *>(&_last_index));
cb.set_callback(&_last_index);
oss << "SELECT MAX(log_index) FROM logdb";
rc += db->exec_rd(oss, this);
rc += db->exec_rd(oss, &cb);
unset_callback();
cb.unset_callback();
if ( rc == 0 )
{
@ -90,14 +81,13 @@ int LogDB::setup_index(int& _last_applied, int& _last_index)
oss.str("");
set_callback(static_cast<Callbackable::Callback>(&LogDB::setup_index_cb),
static_cast<void *>(&_last_applied));
cb.set_callback(&_last_applied);
oss << "SELECT MAX(log_index) FROM logdb WHERE timestamp != 0";
rc += db->exec_rd(oss, this);
rc += db->exec_rd(oss, &cb);
unset_callback();
cb.unset_callback();
if ( rc == 0 )
{
@ -114,39 +104,10 @@ int LogDB::setup_index(int& _last_applied, int& _last_index)
/* -------------------------------------------------------------------------- */
/* -------------------------------------------------------------------------- */
int LogDB::select_cb(void *req, int num, char **values, char **names)
{
if ( !values[0] || !values[1] || !values[2] || !values[3] || !values[4]
|| !values[5] || num != 6 )
{
return -1;
}
LogDBRecord ** request = static_cast<LogDBRecord **>(req);
*request = new LogDBRecord;
(*request)->index = static_cast<unsigned int>(atoi(values[0]));
(*request)->term = static_cast<unsigned int>(atoi(values[1]));
(*request)->sql = values[2];
(*request)->timestamp = static_cast<unsigned int>(atoi(values[3]));
(*request)->prev_index = static_cast<unsigned int>(atoi(values[4]));
(*request)->prev_term = static_cast<unsigned int>(atoi(values[5]));
return 0;
}
/* -------------------------------------------------------------------------- */
/* -------------------------------------------------------------------------- */
LogDBRecord * LogDB::get_log_record(unsigned int index)
int LogDB::get_log_record(unsigned int index, LogDBRecord& lr)
{
ostringstream oss;
LogDBRecord * request = 0;
int prev_index = index - 1;
if ( prev_index < 0 )
@ -158,43 +119,73 @@ LogDBRecord * LogDB::get_log_record(unsigned int index)
<< " FROM logdb c, logdb p WHERE c.log_index = " << index
<< " AND p.log_index = " << prev_index;
set_callback(static_cast<Callbackable::Callback>(&LogDB::select_cb),
static_cast<void *>(&request));
lr.set_callback();
db->exec_rd(oss, this);
int rc = db->exec_rd(oss, &lr);
unset_callback();
lr.unset_callback();
return request;
return rc;
}
/* -------------------------------------------------------------------------- */
/* -------------------------------------------------------------------------- */
int LogDB::raft_state_cb(void *str_value, int num, char **values, char **names)
int LogDB::get_last_record_index(unsigned int& _i, unsigned int& _t)
{
std::string * value = static_cast<std::string *>(str_value);
if ( values[0] != 0 && num == 1 )
class db_callback : public Callbackable
{
*value = values[0];
}
public:
int index;
int term;
return 0;
int cb(void *_db_cbk, int num, char **values, char **names)
{
if ( values == 0 || values[0] == 0 || values[1] == 0 || num != 2 )
{
return -1;
}
index = atoi(values[0]);
term = atoi(values[1]);
return 0;
}
} db_cbk;
std::ostringstream oss;
db_cbk.set_callback(static_cast<Callbackable::Callback>(&db_callback::cb));
oss << "SELECT log_index, term FROM logdb WHERE log_index=(SELECT "
<< "MAX(log_index) FROM logdb)";
int rc = db->exec_rd(oss, &db_cbk);
db_cbk.unset_callback();
_i = db_cbk.index;
_t = db_cbk.term;
return rc;
}
/* -------------------------------------------------------------------------- */
/* -------------------------------------------------------------------------- */
int LogDB::get_raft_state(std::string &raft_xml)
{
ostringstream oss;
single_cb<std::string> cb;
oss << "SELECT sql FROM logdb WHERE log_index = -1 AND term = -1";
set_callback(static_cast<Callbackable::Callback>(&LogDB::raft_state_cb),
static_cast<void *>(&raft_xml));
cb.set_callback(&raft_xml);
int rc = db->exec_rd(oss, this);
int rc = db->exec_rd(oss, &cb);
unset_callback();
cb.unset_callback();
if ( raft_xml.empty() )
{
@ -219,16 +210,7 @@ int LogDB::insert_replace(int index, int term, const std::string& sql,
return -1;
}
if (replace)
{
oss << "REPLACE";
}
else
{
oss << "INSERT";
}
oss << " INTO " << table << " ("<< db_names <<") VALUES ("
oss << "REPLACE INTO " << table << " ("<< db_names <<") VALUES ("
<< index << ","
<< term << ","
<< "'" << sql_db << "',"
@ -283,8 +265,6 @@ int LogDB::insert_log_record(unsigned int term, std::ostringstream& sql,
next_index++;
last_term = term;
pthread_mutex_unlock(&mutex);
return index;
@ -394,17 +374,15 @@ int LogDB::apply_log_records(unsigned int commit_index)
while (last_applied < commit_index )
{
LogDBRecord * lr = get_log_record(last_applied + 1);
LogDBRecord lr;
if ( lr == 0 )
if ( get_log_record(last_applied + 1, lr) != 0 )
{
pthread_mutex_unlock(&mutex);
return -1;
}
rc = apply_log_record(lr);
delete lr;
rc = apply_log_record(&lr);
if ( rc != 0 )
{