1
0
mirror of https://github.com/OpenNebula/one.git synced 2025-03-16 22:50:10 +03:00

F #4809: Make some Raft events synchronous with replication threads

This commit is contained in:
Ruben S. Montero 2017-04-28 22:23:32 +02:00
parent 613ec63426
commit 6f976c1eb8
5 changed files with 56 additions and 186 deletions

View File

@ -165,7 +165,15 @@ public:
*/
int last_index()
{
return next_index - 1;
unsigned int _last_index;
pthread_mutex_lock(&mutex);
_last_index = next_index - 1;
pthread_mutex_unlock(&mutex);
return _last_index;
}
protected:

View File

@ -26,75 +26,6 @@ extern "C" void * raft_manager_loop(void *arg);
/* -------------------------------------------------------------------------- */
/* -------------------------------------------------------------------------- */
class RaftAction : public ActionRequest
{
public:
enum Actions
{
LEADER, /**< Makes this server leader */
FOLLOWER, /**< Makes this server follower */
REPLICATE_LOG, /**< Replicate a log record on followers */
REPLICATE_SUCCESS,/**< Follower successfully replicated entry */
REPLICATE_FAILURE /**< Follower failed to replicate (same term) */
};
RaftAction(Actions a, ReplicaRequest * rrequest):
ActionRequest(ActionRequest::USER), _action(a), _id(-1),
_rrequest(rrequest){};
RaftAction(Actions a, unsigned int id):
ActionRequest(ActionRequest::USER), _action(a), _id(id), _rrequest(0){};
RaftAction(const RaftAction& o):ActionRequest(o._type),
_action(o._action), _id(o._id), _rrequest(o._rrequest){};
Actions action() const
{
return _action;
}
ActionRequest * clone() const
{
return new RaftAction(*this);
}
unsigned int id() const
{
return _id;
}
ReplicaRequest * request() const
{
return _rrequest;
}
private:
/**
* Type of action to trigger on manager
*/
Actions _action;
/**
* ID of any additional resource associated to this action:
* LEADER - new tem
* FOLLOWER - new term
* REPLICATE_SUCCESS - index of follower that replicated record
* REPLICATE_FAILURE - index of follower that replicated record
* ADD_SERVER - id of new follower
* DELETE_SERVER- id of new follower
*/
int _id;
/**
* Pointer to replica request set for:
* REPLICATE_LOG
*/
ReplicaRequest * _rrequest;
};
// -----------------------------------------------------------------------------
// -----------------------------------------------------------------------------
class RaftManager : public ActionListener
{
public:
@ -127,61 +58,39 @@ public:
virtual ~RaftManager(){};
// -------------------------------------------------------------------------
// Raft associated events
// Raft associated actions (synchronous)
// -------------------------------------------------------------------------
/**
* Triggers a REPLICATE_SUCCESS event, when a follower successfully
* replicated a log entry.
* Follower successfully replicated a log entry:
* - Increment next entry to send to follower
* - Update match entry on follower
* - Evaluate majority to apply changes to DB
*/
void replicate_success_trigger(unsigned int follower_id)
{
RaftAction ra(RaftAction::REPLICATE_SUCCESS, follower_id);
am.trigger(ra);
}
void replicate_success(unsigned int follower_id);
/**
* Triggers a REPLICATE_FAILURE event, when a follower failed to replicate
* a log entry (but follower_term <= current_term).
* notify the client if a majority of followers replicated this record.
* Follower failed to replicate a log entry because an inconsistency was
* detected (same index, different term):
* - Decrease follower next_index
* - Retry (do not wait for replica events)
*/
void replicate_failure_trigger(unsigned int follower_id)
{
RaftAction ra(RaftAction::REPLICATE_FAILURE, follower_id);
am.trigger(ra);
}
void replicate_failure(unsigned int follower_id);
/**
* Triggers a REPLICATE event, it will notify the replica threads to
* send the log to the followers
*/
void replicate_log_trigger(ReplicaRequest * rr)
{
RaftAction ra(RaftAction::REPLICATE_LOG, rr);
am.trigger(ra);
}
void replicate_log(ReplicaRequest * rr);
/**
* Makes this server leader, and start replica threads
*/
void leader_trigger(unsigned int term)
{
RaftAction ra(RaftAction::LEADER, term);
am.trigger(ra);
}
void leader(unsigned int term);
/**
* Makes this server follower. Stop associated replication facilities
*/
void follower_trigger(unsigned int term)
{
RaftAction ra(RaftAction::FOLLOWER, term);
am.trigger(ra);
}
void follower(unsigned int term);
/**
* Finalizes the Raft Consensus Manager
@ -367,30 +276,6 @@ private:
*/
void finalize_action(const ActionRequest& ar);
// -------------------------------------------------------------------------
// RaftManager actions
// -------------------------------------------------------------------------
void leader_action(const RaftAction& ra);
void follower_action(const RaftAction& ra);
void replicate_log_action(const RaftAction& ra);
// -------------------------------------------------------------------------
// Log entry replicated on follower
// - Increment next entry to send to follower
// - Update match entry on follower
// - Evaluate majority to apply changes to DB
// -------------------------------------------------------------------------
void replicate_success_action(const RaftAction& ra);
//--------------------------------------------------------------------------
// Log inconsistency in follower
// - Decrease follower index
// - Retry (do not wait for replica events)
//--------------------------------------------------------------------------
void replicate_failure_action(const RaftAction& ra);
/**
* @param s the state to check
* @return true if the server states matches the provided one

View File

@ -60,36 +60,6 @@ int RaftManager::start()
/* -------------------------------------------------------------------------- */
void RaftManager::user_action(const ActionRequest& ar)
{
const RaftAction& ra = static_cast<const RaftAction& >(ar);
switch(ra.action())
{
case RaftAction::LEADER:
leader_action(ra);
break;
case RaftAction::FOLLOWER:
follower_action(ra);
break;
case RaftAction::REPLICATE_LOG:
replicate_log_action(ra);
break;
case RaftAction::REPLICATE_SUCCESS:
replicate_success_action(ra);
break;
case RaftAction::REPLICATE_FAILURE:
replicate_failure_action(ra);
break;
}
}
/* -------------------------------------------------------------------------- */
void RaftManager::finalize_action(const ActionRequest& ar)
{
NebulaLog::log("RCM", Log::INFO, "Raft Consensus Manager...");
@ -98,7 +68,7 @@ void RaftManager::finalize_action(const ActionRequest& ar)
/* -------------------------------------------------------------------------- */
/* -------------------------------------------------------------------------- */
void RaftManager::leader_action(const RaftAction& ra)
void RaftManager::leader(unsigned int _term)
{
Nebula& nd = Nebula::instance();
LogDB * logdb = nd.get_logdb();
@ -193,7 +163,7 @@ void RaftManager::leader_action(const RaftAction& ra)
state = LEADER;
term = ra.id();
term = _term;
num_servers = _num_servers;
@ -207,7 +177,7 @@ void RaftManager::leader_action(const RaftAction& ra)
/* -------------------------------------------------------------------------- */
/* -------------------------------------------------------------------------- */
void RaftManager::follower_action(const RaftAction& ra)
void RaftManager::follower(unsigned int _term)
{
int lapplied, lindex;
@ -222,7 +192,7 @@ void RaftManager::follower_action(const RaftAction& ra)
state = FOLLOWER;
term = ra.id();
term = _term;
NebulaLog::log("RCM", Log::INFO, "oned is set to follower mode");
@ -245,7 +215,7 @@ void RaftManager::follower_action(const RaftAction& ra)
/* -------------------------------------------------------------------------- */
/* -------------------------------------------------------------------------- */
void RaftManager::replicate_log_action(const RaftAction& ra)
void RaftManager::replicate_log(ReplicaRequest * request)
{
pthread_mutex_lock(&mutex);
@ -255,8 +225,6 @@ void RaftManager::replicate_log_action(const RaftAction& ra)
return;
}
ReplicaRequest * request = ra.request();
if ( num_servers <= 1 )
{
request->notify();
@ -284,7 +252,7 @@ void RaftManager::replicate_log_action(const RaftAction& ra)
/* -------------------------------------------------------------------------- */
/* -------------------------------------------------------------------------- */
void RaftManager::replicate_success_action(const RaftAction& ra)
void RaftManager::replicate_success(unsigned int follower_id)
{
std::map<unsigned int, ReplicaRequest *>::iterator it;
@ -294,8 +262,6 @@ void RaftManager::replicate_success_action(const RaftAction& ra)
Nebula& nd = Nebula::instance();
LogDB * logdb = nd.get_logdb();
int follower_id = ra.id();
pthread_mutex_lock(&mutex);
next_it = next.find(follower_id);
@ -337,19 +303,17 @@ void RaftManager::replicate_success_action(const RaftAction& ra)
/* -------------------------------------------------------------------------- */
/* -------------------------------------------------------------------------- */
void RaftManager::replicate_failure_action(const RaftAction& ra)
void RaftManager::replicate_failure(unsigned int follower_id)
{
std::map<unsigned int, unsigned int>::iterator next_it;
int follower_id = ra.id();
pthread_mutex_lock(&mutex);
next_it = next.find(follower_id);
if ( next_it != next.end() )
{
next_it->second = next_it->second - 1;
next_it->second = next_it->second - 1;
}
pthread_mutex_unlock(&mutex);

View File

@ -197,7 +197,6 @@ int ReplicaThread::xml_rpc_replicate(unsigned int commit, LogDBRecord * lr,
return xml_rc;
}
// -----------------------------------------------------------------------------
// -----------------------------------------------------------------------------
@ -252,7 +251,7 @@ void ReplicaThread::do_replication()
{
ostringstream ess;
ess << "Failed to load log entry at index: " << next_index;
ess << "Failed to load log record at index: " << next_index;
NebulaLog::log("RCM", Log::ERROR, ess);
@ -282,17 +281,17 @@ void ReplicaThread::do_replication()
if ( success )
{
raftm->replicate_success_trigger(follower_id);
raftm->replicate_success(follower_id);
}
else
{
if ( follower_term > term )
{
raftm->follower_trigger(follower_term);
raftm->follower(follower_term);
}
else
{
raftm->replicate_failure_trigger(follower_id);
raftm->replicate_failure(follower_id);
}
}
}

View File

@ -78,12 +78,16 @@ int LogDB::setup_index(int& _last_applied, int& _last_index)
rc += db->exec_rd(oss, this);
unset_callback();
if ( rc == 0 )
{
next_index = _last_index + 1;
}
pthread_mutex_lock(&mutex);
unset_callback();
next_index = _last_index + 1;
pthread_mutex_unlock(&mutex);
}
oss.str("");
@ -94,12 +98,16 @@ int LogDB::setup_index(int& _last_applied, int& _last_index)
rc += db->exec_rd(oss, this);
unset_callback();
if ( rc == 0 )
{
last_applied = _last_applied;
}
pthread_mutex_lock(&mutex);
unset_callback();
last_applied = _last_applied;
pthread_mutex_unlock(&mutex);
}
return rc;
}
@ -285,7 +293,7 @@ int LogDB::exec_wr(ostringstream& cmd)
ReplicaRequest rr(rindex);
raftm->replicate_log_trigger(&rr);
raftm->replicate_log(&rr);
// Wait for completion
rr.wait();
@ -345,12 +353,15 @@ int LogDB::apply_log_records(unsigned int commit_index)
{
int rc;
pthread_mutex_lock(&mutex);
while (last_applied < commit_index )
{
LogDBRecord * lr = get_log_record(last_applied + 1);
if ( lr == 0 )
{
pthread_mutex_unlock(&mutex);
return -1;
}
@ -360,10 +371,13 @@ int LogDB::apply_log_records(unsigned int commit_index)
if ( rc != 0 )
{
pthread_mutex_unlock(&mutex);
return -1;
}
}
pthread_mutex_unlock(&mutex);
return 0;
}