1
0
mirror of https://github.com/OpenNebula/one.git synced 2025-01-20 14:03:36 +03:00

F #4809: Adds vote requests from candidate, fix some bugs

This commit is contained in:
Ruben S. Montero 2017-05-03 20:04:42 +02:00
parent 2a695bc8f0
commit c6a7500df5
8 changed files with 482 additions and 216 deletions

View File

@ -135,7 +135,7 @@ public:
*/ */
int insert_raft_state(std::string& raft_xml, bool replace) int insert_raft_state(std::string& raft_xml, bool replace)
{ {
return insert_replace(0, -1, raft_xml, 0, replace); return insert_replace(-1, -1, raft_xml, 0, replace);
} }
/** /**

View File

@ -77,15 +77,6 @@ public:
*/ */
void replicate_log(ReplicaRequest * rr); void replicate_log(ReplicaRequest * rr);
/**
* Makes this server leader, and start replica threads
*/
void leader(unsigned int term);
/**
* Makes this server follower. Stop associated replication facilities
*/
void follower(unsigned int term);
/** /**
* Finalizes the Raft Consensus Manager * Finalizes the Raft Consensus Manager
@ -108,6 +99,11 @@ public:
// ------------------------------------------------------------------------- // -------------------------------------------------------------------------
// Raft state query functions // Raft state query functions
// ------------------------------------------------------------------------- // -------------------------------------------------------------------------
/**
* Makes this server follower. Stop associated replication facilities
*/
void follower(unsigned int term);
unsigned int get_term() unsigned int get_term()
{ {
unsigned int _term; unsigned int _term;
@ -164,12 +160,6 @@ public:
return _commit; return _commit;
} }
/**
* Update the term for this server. The state will be stored in the DB.
* @param _term the new term
*/
void update_term(unsigned int _term);
/** /**
* Evaluates a vote request. It is granted if no vote has been granted in * Evaluates a vote request. It is granted if no vote has been granted in
* this term or it is requested by the same candidate. * this term or it is requested by the same candidate.
@ -241,6 +231,20 @@ public:
int xmlrpc_replicate_log(int follower_id, LogDBRecord * lr, bool& success, int xmlrpc_replicate_log(int follower_id, LogDBRecord * lr, bool& success,
unsigned int& ft, std::string& error); unsigned int& ft, std::string& error);
/**
* Calls the request vote xml-rpc method
* @param follower_id to make the call
* @param lindex highest last log index
* @param lterm highest last log term
* @param success of the xml-rpc method
* @param ft term in the follower as returned by the replicate call
* @param error describing error if any
* @return -1 if a XMl-RPC (network) error occurs, 0 otherwise
*/
int xmlrpc_request_vote(int follower_id, unsigned int lindex,
unsigned int lterm, bool& success, unsigned int& fterm,
std::string& error);
// ------------------------------------------------------------------------- // -------------------------------------------------------------------------
// Server related interface // Server related interface
// ------------------------------------------------------------------------- // -------------------------------------------------------------------------
@ -393,6 +397,16 @@ private:
* Send the heartbeat to the followers. * Send the heartbeat to the followers.
*/ */
void send_heartbeat(); void send_heartbeat();
/**
* Request votes of followers
*/
void request_vote();
/**
* Makes this server leader, and start replica threads
*/
void leader();
}; };
#endif /*RAFT_MANAGER_H_*/ #endif /*RAFT_MANAGER_H_*/

View File

@ -88,7 +88,7 @@ class ZoneReplicateLog : public RequestManagerZone
public: public:
ZoneReplicateLog(): ZoneReplicateLog():
RequestManagerZone("ZoneReplicateLog", "Replicate a log record", RequestManagerZone("ZoneReplicateLog", "Replicate a log record",
"A:siiiiiis"){}; "A:siiiiiiis"){};
~ZoneReplicateLog(){}; ~ZoneReplicateLog(){};

View File

@ -309,7 +309,7 @@ void Nebula::start(bool bootstrap_only)
db_backend = new MySqlDB(server, port, user, passwd, db_name); db_backend = new MySqlDB(server, port, user, passwd, db_name);
} }
solo = server_id == -1; solo = server_id == -1 || bootstrap_only;
if ( solo ) if ( solo )
{ {
@ -1040,37 +1040,6 @@ void Nebula::start(bool bootstrap_only)
throw runtime_error("Could not start the Request Manager"); throw runtime_error("Could not start the Request Manager");
} }
// -----------------------------------------------------------
// Start HA mode if working in a cluster of oned's
// -----------------------------------------------------------
if ( server_id == -1 )
{
NebulaLog::log("ONE", Log::INFO, "No SERVER_ID defined, oned started "
"in solo mode.");
}
else
{
////////////////////////////////////////////////////////////////////////
// LOG REPLICATION DEBUG //
////////////////////////////////////////////////////////////////////////
if ( server_id == 0 )
{
raftm->leader(0);
}
else
{
raftm->follower(0);
}
////////////////////////////////////////////////////////////////////////
// TODO:
// - start all servers in follower mode
// - load current term from DB (sysconfig attribute?)
// - load last vote casted
////////////////////////////////////////////////////////////////////////
}
// ----------------------------------------------------------- // -----------------------------------------------------------
// Wait for a SIGTERM or SIGINT signal // Wait for a SIGTERM or SIGINT signal
// ----------------------------------------------------------- // -----------------------------------------------------------

View File

@ -36,39 +36,34 @@ static void set_timeout(long long ms, struct timespec& timeout)
timeout.tv_nsec = d.rem * 1000000; timeout.tv_nsec = d.rem * 1000000;
} }
static unsigned int get_zone_servers(std::map<unsigned int, std::string>& _s);
/* -------------------------------------------------------------------------- */
/* -------------------------------------------------------------------------- */
/* RaftManager component life-cycle functions */
/* -------------------------------------------------------------------------- */
/* -------------------------------------------------------------------------- */
RaftManager::RaftManager(int id, time_t log_purge, long long bcast, RaftManager::RaftManager(int id, time_t log_purge, long long bcast,
long long elect, time_t xmlrpc):server_id(id), term(0), num_servers(0), long long elect, time_t xmlrpc):server_id(id), term(0), num_servers(0),
commit(0) commit(0)
{ {
Nebula& nd = Nebula::instance();
LogDB * logdb = nd.get_logdb();
std::string raft_xml; std::string raft_xml;
pthread_mutex_init(&mutex, 0); pthread_mutex_init(&mutex, 0);
if ( server_id == -1 )
{
state = SOLO;
}
else
{
state = FOLLOWER;
}
am.addListener(this); am.addListener(this);
purge_period_ms = log_purge * 1000; // -------------------------------------------------------------------------
xmlrpc_timeout_ms = xmlrpc; // Initialize Raft variables:
// - state
set_timeout(bcast, broadcast_timeout); // - servers
//TODO: RANDOM INITALIZATIZION OF BASE ELECT // - votedfor
set_timeout(elect, election_timeout); // - term
// -------------------------------------------------------------------------
// Warm up of 5 seconds to start the election timeout
clock_gettime(CLOCK_REALTIME, &last_heartbeat);
last_heartbeat.tv_sec += 5;
Nebula& nd = Nebula::instance();
LogDB * logdb = nd.get_logdb();
if ( logdb->get_raft_state(raft_xml) != 0 ) if ( logdb->get_raft_state(raft_xml) != 0 )
{ {
raft_state.replace("TERM", 0); raft_state.replace("TERM", 0);
@ -88,6 +83,33 @@ RaftManager::RaftManager(int id, time_t log_purge, long long bcast,
raft_state.get("TERM", term); raft_state.get("TERM", term);
raft_state.get("VOTEDFOR", votedfor); raft_state.get("VOTEDFOR", votedfor);
} }
num_servers = get_zone_servers(servers);
if ( server_id == -1 )
{
NebulaLog::log("ONE", Log::INFO, "oned started in solo mode.");
state = SOLO;
}
else
{
NebulaLog::log("RCM", Log::INFO, "oned started in follower mode");
state = FOLLOWER;
}
// -------------------------------------------------------------------------
// Initialize Raft timers
// TODO: randomize election timeout
// -------------------------------------------------------------------------
purge_period_ms = log_purge * 1000;
xmlrpc_timeout_ms = xmlrpc;
set_timeout(bcast, broadcast_timeout);
set_timeout(elect, election_timeout);
// 5 seconds warm-up to start election
clock_gettime(CLOCK_REALTIME, &last_heartbeat);
last_heartbeat.tv_sec += 5;
}; };
/* -------------------------------------------------------------------------- */ /* -------------------------------------------------------------------------- */
@ -141,6 +163,9 @@ void RaftManager::finalize_action(const ActionRequest& ar)
NebulaLog::log("RCM", Log::INFO, "Raft Consensus Manager..."); NebulaLog::log("RCM", Log::INFO, "Raft Consensus Manager...");
} }
/* -------------------------------------------------------------------------- */
/* -------------------------------------------------------------------------- */
/* Server management interface */
/* -------------------------------------------------------------------------- */ /* -------------------------------------------------------------------------- */
/* -------------------------------------------------------------------------- */ /* -------------------------------------------------------------------------- */
@ -230,10 +255,13 @@ void RaftManager::delete_server(unsigned int follower_id)
pthread_mutex_unlock(&mutex); pthread_mutex_unlock(&mutex);
}; };
/* -------------------------------------------------------------------------- */
/* -------------------------------------------------------------------------- */
/* State transitions & and callbacks */
/* -------------------------------------------------------------------------- */ /* -------------------------------------------------------------------------- */
/* -------------------------------------------------------------------------- */ /* -------------------------------------------------------------------------- */
void RaftManager::leader(unsigned int _term) void RaftManager::leader()
{ {
LogDB * logdb = Nebula::instance().get_logdb(); LogDB * logdb = Nebula::instance().get_logdb();
@ -242,47 +270,44 @@ void RaftManager::leader(unsigned int _term)
int index, _applied; int index, _applied;
unsigned int _num_servers;
std::map<unsigned int, std::string> _servers; std::map<unsigned int, std::string> _servers;
std::ostringstream oss; std::ostringstream oss;
std::string raft_state_xml;
logdb->setup_index(_applied, index);
pthread_mutex_lock(&mutex); pthread_mutex_lock(&mutex);
if ( state != FOLLOWER ) if ( state != CANDIDATE )
{ {
NebulaLog::log("RCM", Log::INFO, "Cannot become leader, no longer "
"candidate");
pthread_mutex_unlock(&mutex); pthread_mutex_unlock(&mutex);
return; return;
} }
pthread_mutex_unlock(&mutex);
//--------------------------------------------------------------------------
// Initialize leader variables
//--------------------------------------------------------------------------
logdb->setup_index(_applied, index);
oss << "Becoming leader of zone. Last log record: " << index << " last " oss << "Becoming leader of zone. Last log record: " << index << " last "
<< "applied record: " << _applied; << "applied record: " << _applied;
NebulaLog::log("RCM", Log::INFO, oss); NebulaLog::log("RCM", Log::INFO, oss);
_num_servers = get_zone_servers(_servers);
pthread_mutex_lock(&mutex);
next.clear(); next.clear();
match.clear(); match.clear();
requests.clear(); requests.clear();
num_servers = _num_servers;
servers = _servers;
state = LEADER; state = LEADER;
commit = _applied; commit = _applied;
term = _term; votedfor = -1;
raft_state.replace("VOTEDFOR", votedfor);
raft_state.to_xml(raft_state_xml);
for (it = servers.begin(); it != servers.end() ; ++it ) for (it = servers.begin(); it != servers.end() ; ++it )
{ {
@ -302,6 +327,8 @@ void RaftManager::leader(unsigned int _term)
pthread_mutex_unlock(&mutex); pthread_mutex_unlock(&mutex);
logdb->insert_raft_state(raft_state_xml, true);
NebulaLog::log("RCM", Log::INFO, "oned is now the leader of zone"); NebulaLog::log("RCM", Log::INFO, "oned is now the leader of zone");
} }
@ -315,6 +342,8 @@ void RaftManager::follower(unsigned int _term)
Nebula& nd = Nebula::instance(); Nebula& nd = Nebula::instance();
LogDB * logdb = nd.get_logdb(); LogDB * logdb = nd.get_logdb();
std::string raft_state_xml;
logdb->setup_index(lapplied, lindex); logdb->setup_index(lapplied, lindex);
pthread_mutex_lock(&mutex); pthread_mutex_lock(&mutex);
@ -324,6 +353,14 @@ void RaftManager::follower(unsigned int _term)
state = FOLLOWER; state = FOLLOWER;
term = _term; term = _term;
votedfor = -1;
commit = lapplied;
raft_state.replace("VOTEDFOR", votedfor);
raft_state.replace("TERM", term);
raft_state.to_xml(raft_state_xml);
NebulaLog::log("RCM", Log::INFO, "oned is set to follower mode"); NebulaLog::log("RCM", Log::INFO, "oned is set to follower mode");
@ -338,9 +375,14 @@ void RaftManager::follower(unsigned int _term)
it->second->notify(); it->second->notify();
} }
next.clear();
match.clear();
requests.clear(); requests.clear();
pthread_mutex_unlock(&mutex); pthread_mutex_unlock(&mutex);
logdb->insert_raft_state(raft_state_xml, true);
} }
/* -------------------------------------------------------------------------- */ /* -------------------------------------------------------------------------- */
@ -456,6 +498,9 @@ void RaftManager::replicate_failure(unsigned int follower_id)
pthread_mutex_unlock(&mutex); pthread_mutex_unlock(&mutex);
} }
/* -------------------------------------------------------------------------- */
/* -------------------------------------------------------------------------- */
/* Raft state interface */
/* -------------------------------------------------------------------------- */ /* -------------------------------------------------------------------------- */
/* -------------------------------------------------------------------------- */ /* -------------------------------------------------------------------------- */
@ -471,41 +516,6 @@ void RaftManager::update_last_heartbeat()
/* -------------------------------------------------------------------------- */ /* -------------------------------------------------------------------------- */
/* -------------------------------------------------------------------------- */ /* -------------------------------------------------------------------------- */
void RaftManager::update_term(unsigned int _term)
{
Nebula& nd = Nebula::instance();
LogDB * logdb = nd.get_logdb();
std::string raft_state_xml;
bool new_term = false;
pthread_mutex_lock(&mutex);
if ( _term > term )
{
NebulaLog::log("RCM", Log::INFO, "Follower entering a new term.");
new_term = true;
votedfor = -1;
raft_state.replace("TERM", _term);
raft_state.to_xml(raft_state_xml);
}
term = _term;
pthread_mutex_unlock(&mutex);
if ( new_term == true )
{
logdb->insert_raft_state(raft_state_xml, true);
}
}
/* -------------------------------------------------------------------------- */
/* -------------------------------------------------------------------------- */
int RaftManager::update_votedfor(int _votedfor) int RaftManager::update_votedfor(int _votedfor)
{ {
Nebula& nd = Nebula::instance(); Nebula& nd = Nebula::instance();
@ -535,69 +545,6 @@ int RaftManager::update_votedfor(int _votedfor)
return 0; return 0;
} }
/* -------------------------------------------------------------------------- */
/* -------------------------------------------------------------------------- */
void RaftManager::send_heartbeat()
{
std::map<unsigned int, std::string> _servers;
std::map<unsigned int, std::string>::iterator it;
LogDBRecord lr;
bool success;
unsigned int fterm;
std::string error;
lr.index = 0;
lr.prev_index = 0;
lr.term = 0;
lr.prev_term = 0;
lr.sql = "";
lr.timestamp = 0;
pthread_mutex_lock(&mutex);
if ( state != LEADER )
{
pthread_mutex_unlock(&mutex);
return;
}
_servers = servers;
pthread_mutex_unlock(&mutex);
for (it = _servers.begin(); it != _servers.end() ; ++it )
{
if ( it->first == (unsigned int) server_id )
{
continue;
}
int rc = xmlrpc_replicate_log(it->first, &lr, success, fterm, error);
if ( rc == -1 )
{
std::ostringstream oss;
oss << "Error sending heartbeat to follower " << it->first <<": "
<< error;
NebulaLog::log("RCM", Log::INFO, oss);
}
else if ( success == false && fterm > term )
{
follower(fterm);
break;
}
}
}
/* -------------------------------------------------------------------------- */ /* -------------------------------------------------------------------------- */
/* -------------------------------------------------------------------------- */ /* -------------------------------------------------------------------------- */
@ -659,15 +606,22 @@ void RaftManager::timer_action(const ActionRequest& ar)
time_t sec = last_heartbeat.tv_sec + election_timeout.tv_sec; time_t sec = last_heartbeat.tv_sec + election_timeout.tv_sec;
long nsec = last_heartbeat.tv_nsec + election_timeout.tv_nsec; long nsec = last_heartbeat.tv_nsec + election_timeout.tv_nsec;
pthread_mutex_unlock(&mutex);
if ((sec < the_time.tv_sec) || (sec == the_time.tv_sec && if ((sec < the_time.tv_sec) || (sec == the_time.tv_sec &&
nsec <= the_time.tv_nsec)) nsec <= the_time.tv_nsec))
{ {
NebulaLog::log("RRM", Log::ERROR, "Failed to get heartbeat from " NebulaLog::log("RRM", Log::ERROR, "Failed to get heartbeat from "
"leader. Starting election proccess"); "leader. Starting election proccess");
//TODO
state = CANDIDATE;
clock_gettime(CLOCK_REALTIME, &last_heartbeat);
pthread_mutex_unlock(&mutex);
request_vote();
} }
pthread_mutex_unlock(&mutex);
} }
else else
{ {
@ -677,14 +631,202 @@ void RaftManager::timer_action(const ActionRequest& ar)
return; return;
} }
// ----------------------------------------------------------------------------- /* -------------------------------------------------------------------------- */
// ----------------------------------------------------------------------------- /* -------------------------------------------------------------------------- */
/* XML-RPC interface to talk to followers */
/* -------------------------------------------------------------------------- */
/* -------------------------------------------------------------------------- */
void RaftManager::send_heartbeat()
{
std::map<unsigned int, std::string> _servers;
std::map<unsigned int, std::string>::iterator it;
LogDBRecord lr;
bool success;
unsigned int fterm;
std::string error;
lr.index = 0;
lr.prev_index = 0;
lr.term = 0;
lr.prev_term = 0;
lr.sql = "";
lr.timestamp = 0;
pthread_mutex_lock(&mutex);
if ( state != LEADER )
{
pthread_mutex_unlock(&mutex);
return;
}
_servers = servers;
pthread_mutex_unlock(&mutex);
for (it = _servers.begin(); it != _servers.end() ; ++it )
{
if ( it->first == (unsigned int) server_id )
{
continue;
}
int rc = xmlrpc_replicate_log(it->first, &lr, success, fterm, error);
if ( rc == -1 )
{
std::ostringstream oss;
oss << "Error sending heartbeat to follower " << it->first <<": "
<< error;
NebulaLog::log("RCM", Log::INFO, oss);
}
else if ( success == false && fterm > term )
{
std::ostringstream oss;
oss << "Follower " << it->first << " term (" << fterm
<< ") is higher than current (" << term << ")";
NebulaLog::log("RCM", Log::INFO, oss);
follower(fterm);
break;
}
}
}
/* -------------------------------------------------------------------------- */
/* -------------------------------------------------------------------------- */
void RaftManager::request_vote()
{
unsigned int lindex, lterm, fterm;
std::map<unsigned int, std::string> _servers;
std::map<unsigned int, std::string>::iterator it;
std::ostringstream oss;
unsigned int granted_votes = 0;
unsigned int votes2go;
Nebula& nd = Nebula::instance();
LogDB * logdb = nd.get_logdb();
int rc;
std::string error;
std::string raft_state_xml;
bool success;
pthread_mutex_lock(&mutex);
if ( state != CANDIDATE )
{
pthread_mutex_unlock(&mutex);
return;
}
term = term + 1;
votedfor = server_id;
raft_state.replace("TERM", term);
raft_state.replace("VOTEDFOR", votedfor);
raft_state.to_xml(raft_state_xml);
votes2go = num_servers / 2;
pthread_mutex_unlock(&mutex);
logdb->insert_raft_state(raft_state_xml, true);
logdb->get_last_record_index(lindex, lterm);
for (it = _servers.begin(); it != _servers.end() ; ++it, oss.str("") )
{
if ( it->first == (unsigned int) server_id )
{
continue;
}
rc = xmlrpc_request_vote(it->first, lindex, lterm, success, fterm, error);
if ( rc == -1 )
{
oss << "Error sending vote request to follower " << it->first <<": "
<< error;
NebulaLog::log("RCM", Log::INFO, oss);
}
else if ( success == false && fterm > term )
{
oss << "Follower " << it->first << " has a higher term, turning "
<< "into follower";
NebulaLog::log("RCM", Log::INFO, oss);
follower(fterm);
break;
}
else if ( success == true )
{
granted_votes++;
oss << "Got vote from server: " << it->first << ". Total votes: "
<< granted_votes;
NebulaLog::log("RCM", Log::INFO, oss);
}
if ( granted_votes >= votes2go )
{
NebulaLog::log("RCM", Log::INFO, "Got majority of votes");
break;
}
}
if ( granted_votes >= votes2go )
{
leader();
return;
}
//This election process failed, start a new one by expiring heartbeat
pthread_mutex_lock(&mutex);
if (state == CANDIDATE)
{
state = FOLLOWER;
last_heartbeat.tv_sec = 0;
last_heartbeat.tv_nsec= 0;
}
pthread_mutex_unlock(&mutex);
}
/* -------------------------------------------------------------------------- */
/* -------------------------------------------------------------------------- */
int RaftManager::xmlrpc_replicate_log(int follower_id, LogDBRecord * lr, int RaftManager::xmlrpc_replicate_log(int follower_id, LogDBRecord * lr,
bool& success, unsigned int& fterm, std::string& error) bool& success, unsigned int& fterm, std::string& error)
{ {
int _server_id; int _server_id;
int _commit; int _commit;
int _term;
static const std::string replica_method = "one.zone.replicate"; static const std::string replica_method = "one.zone.replicate";
@ -712,6 +854,7 @@ int RaftManager::xmlrpc_replicate_log(int follower_id, LogDBRecord * lr,
follower_edp = it->second; follower_edp = it->second;
_commit = commit; _commit = commit;
_term = term;
_server_id = server_id; _server_id = server_id;
pthread_mutex_unlock(&mutex); pthread_mutex_unlock(&mutex);
@ -736,6 +879,7 @@ int RaftManager::xmlrpc_replicate_log(int follower_id, LogDBRecord * lr,
replica_params.add(xmlrpc_c::value_string(secret)); replica_params.add(xmlrpc_c::value_string(secret));
replica_params.add(xmlrpc_c::value_int(_server_id)); replica_params.add(xmlrpc_c::value_int(_server_id));
replica_params.add(xmlrpc_c::value_int(_commit)); replica_params.add(xmlrpc_c::value_int(_commit));
replica_params.add(xmlrpc_c::value_int(_term));
replica_params.add(xmlrpc_c::value_int(lr->index)); replica_params.add(xmlrpc_c::value_int(lr->index));
replica_params.add(xmlrpc_c::value_int(lr->term)); replica_params.add(xmlrpc_c::value_int(lr->term));
replica_params.add(xmlrpc_c::value_int(lr->prev_index)); replica_params.add(xmlrpc_c::value_int(lr->prev_index));
@ -804,3 +948,127 @@ int RaftManager::xmlrpc_replicate_log(int follower_id, LogDBRecord * lr,
return xml_rc; return xml_rc;
} }
/* -------------------------------------------------------------------------- */
/* -------------------------------------------------------------------------- */
int RaftManager::xmlrpc_request_vote(int follower_id, unsigned int lindex,
unsigned int lterm, bool& success, unsigned int& fterm,
std::string& error)
{
int _server_id;
int _term;
static const std::string replica_method = "one.zone.voterequest";
std::ostringstream ess;
std::string secret;
std::string follower_edp;
std::map<unsigned int, std::string>::iterator it;
int xml_rc = 0;
pthread_mutex_lock(&mutex);
it = servers.find(follower_id);
if ( it == servers.end() )
{
error = "Cannot find follower end point";
pthread_mutex_unlock(&mutex);
return -1;
}
follower_edp = it->second;
_term = term;
_server_id = server_id;
pthread_mutex_unlock(&mutex);
// -------------------------------------------------------------------------
// Get parameters to call append entries on follower
// -------------------------------------------------------------------------
if ( Client::read_oneauth(secret, error) == -1 )
{
NebulaLog::log("RRM", Log::ERROR, error);
return -1;
}
xmlrpc_c::carriageParm_curl0 carriage(follower_edp);
xmlrpc_c::paramList replica_params;
xmlrpc_c::clientXmlTransport_curl transport;
xmlrpc_c::client_xml client(&transport);
replica_params.add(xmlrpc_c::value_string(secret));
replica_params.add(xmlrpc_c::value_int(_term));
replica_params.add(xmlrpc_c::value_int(_server_id));
replica_params.add(xmlrpc_c::value_int(lindex));
replica_params.add(xmlrpc_c::value_int(lterm));
xmlrpc_c::rpcPtr rpc_client(replica_method, replica_params);
// -------------------------------------------------------------------------
// Do the XML-RPC call
// -------------------------------------------------------------------------
try
{
rpc_client->start(&client, &carriage);
client.finishAsync(xmlrpc_c::timeout(xmlrpc_timeout_ms));
if (!rpc_client->isFinished())
{
rpc_client->finishErr(girerr::error("XMLRPC method "+replica_method
+ " timeout, resetting call"));
}
if ( rpc_client->isSuccessful() )
{
vector<xmlrpc_c::value> values;
xmlrpc_c::value result = rpc_client->getResult();
values = xmlrpc_c::value_array(result).vectorValueValue();
success = xmlrpc_c::value_boolean(values[0]);
if ( success ) //values[2] = error code (string)
{
fterm = xmlrpc_c::value_int(values[1]);
}
else
{
error = xmlrpc_c::value_string(values[1]);
fterm = xmlrpc_c::value_int(values[3]);
}
}
else //RPC failed, vote not granted
{
xmlrpc_c::fault failure = rpc_client->getFault();
ess << "Error requesting vote from follower " << follower_id << ": "
<< failure.getDescription();
error = ess.str();
xml_rc = -1;
}
}
catch (exception const& e)
{
ess << "Error requesting vote from follower " << follower_id << ": "
<< e.what();
error = ess.str();
xml_rc = -1;
}
return xml_rc;
}

View File

@ -151,6 +151,13 @@ void ReplicaThread::do_replication()
{ {
if ( follower_term > term ) if ( follower_term > term )
{ {
ostringstream ess;
ess << "Follower " << follower_id << " term (" << follower_term
<< ") is higher than current (" << term << ")";
NebulaLog::log("RCM", Log::INFO, ess);
raftm->follower(follower_term); raftm->follower(follower_term);
} }
else else

View File

@ -127,14 +127,15 @@ void ZoneReplicateLog::request_execute(xmlrpc_c::paramList const& paramList,
RaftManager * raftm = nd.get_raftm(); RaftManager * raftm = nd.get_raftm();
int leader_id = xmlrpc_c::value_int(paramList.getInt(1)); int leader_id = xmlrpc_c::value_int(paramList.getInt(1));
int commit = xmlrpc_c::value_int(paramList.getInt(2)); int leader_commit = xmlrpc_c::value_int(paramList.getInt(2));
unsigned int leader_term = xmlrpc_c::value_int(paramList.getInt(3));
unsigned int index = xmlrpc_c::value_int(paramList.getInt(3)); unsigned int index = xmlrpc_c::value_int(paramList.getInt(4));
unsigned int term = xmlrpc_c::value_int(paramList.getInt(4)); unsigned int term = xmlrpc_c::value_int(paramList.getInt(5));
unsigned int prev_index = xmlrpc_c::value_int(paramList.getInt(5)); unsigned int prev_index = xmlrpc_c::value_int(paramList.getInt(6));
unsigned int prev_term = xmlrpc_c::value_int(paramList.getInt(6)); unsigned int prev_term = xmlrpc_c::value_int(paramList.getInt(7));
string sql = xmlrpc_c::value_string(paramList.getString(7)); string sql = xmlrpc_c::value_string(paramList.getString(8));
unsigned int current_term = raftm->get_term(); unsigned int current_term = raftm->get_term();
@ -148,22 +149,31 @@ void ZoneReplicateLog::request_execute(xmlrpc_c::paramList const& paramList,
return; return;
} }
if ( !raftm->is_follower() ) if ( leader_term < current_term )
{ {
att.resp_msg = "oned is not a follower. Cannot replicate log record."; std::ostringstream oss;
oss << "Leader term (" << leader_term << ") is outdated ("
<< current_term<<")";
NebulaLog::log("ReM", Log::INFO, oss);
att.resp_msg = oss.str();
att.resp_id = current_term; att.resp_id = current_term;
failure_response(ACTION, att); failure_response(ACTION, att);
return; return;
} }
else if ( leader_term > current_term )
if ( term < current_term )
{ {
att.resp_msg = "Leader term is outdated"; std::ostringstream oss;
att.resp_id = current_term;
failure_response(ACTION, att); oss << "New term (" << leader_term << ") discovered from leader "
return; << leader_id;
NebulaLog::log("ReM", Log::INFO, oss);
raftm->follower(leader_term);
} }
raftm->update_last_heartbeat(); raftm->update_last_heartbeat();
@ -225,9 +235,7 @@ void ZoneReplicateLog::request_execute(xmlrpc_c::paramList const& paramList,
return; return;
} }
raftm->update_term(term); unsigned int new_commit = raftm->update_commit(leader_commit, index);
unsigned int new_commit = raftm->update_commit(commit, index);
logdb->apply_log_records(new_commit); logdb->apply_log_records(new_commit);
@ -274,7 +282,6 @@ void ZoneVoteRequest::request_execute(xmlrpc_c::paramList const& paramList,
return; return;
} }
if ((log_term > candidate_log_term) || ((log_term == candidate_log_term) && if ((log_term > candidate_log_term) || ((log_term == candidate_log_term) &&
(log_index > candidate_log_index))) (log_index > candidate_log_index)))
{ {
@ -283,7 +290,6 @@ void ZoneVoteRequest::request_execute(xmlrpc_c::paramList const& paramList,
failure_response(ACTION, att); failure_response(ACTION, att);
return; return;
} }
if ( raftm->update_votedfor(candidate_id) != 0 ) if ( raftm->update_votedfor(candidate_id) != 0 )
@ -295,6 +301,8 @@ void ZoneVoteRequest::request_execute(xmlrpc_c::paramList const& paramList,
return; return;
} }
raftm->update_last_heartbeat();
success_response(static_cast<int>(current_term), att); success_response(static_cast<int>(current_term), att);
} }

View File

@ -187,7 +187,7 @@ int LogDB::get_raft_state(std::string &raft_xml)
{ {
ostringstream oss; ostringstream oss;
oss << "SELECT sql FROM logdb WHERE log_index = 0 AND term = -1"; oss << "SELECT sql FROM logdb WHERE log_index = -1 AND term = -1";
set_callback(static_cast<Callbackable::Callback>(&LogDB::raft_state_cb), set_callback(static_cast<Callbackable::Callback>(&LogDB::raft_state_cb),
static_cast<void *>(&raft_xml)); static_cast<void *>(&raft_xml));