From 6f976c1eb892dc81d01b7f63cd6b5b9ec0b4e50b Mon Sep 17 00:00:00 2001 From: "Ruben S. Montero" Date: Fri, 28 Apr 2017 22:23:32 +0200 Subject: [PATCH] F #4809: Make some Raft events synchronous with replication threads --- include/LogDB.h | 10 ++- include/RaftManager.h | 143 ++++--------------------------------- src/raft/RaftManager.cc | 52 +++----------- src/raft/ReplicaManager.cc | 9 ++- src/sql/LogDB.cc | 28 ++++++-- 5 files changed, 56 insertions(+), 186 deletions(-) diff --git a/include/LogDB.h b/include/LogDB.h index ae8107cf3d..dd88e83879 100644 --- a/include/LogDB.h +++ b/include/LogDB.h @@ -165,7 +165,15 @@ public: */ int last_index() { - return next_index - 1; + unsigned int _last_index; + + pthread_mutex_lock(&mutex); + + _last_index = next_index - 1; + + pthread_mutex_unlock(&mutex); + + return _last_index; } protected: diff --git a/include/RaftManager.h b/include/RaftManager.h index b26fdf46b7..279d410e26 100644 --- a/include/RaftManager.h +++ b/include/RaftManager.h @@ -26,75 +26,6 @@ extern "C" void * raft_manager_loop(void *arg); /* -------------------------------------------------------------------------- */ /* -------------------------------------------------------------------------- */ -class RaftAction : public ActionRequest -{ -public: - enum Actions - { - LEADER, /**< Makes this server leader */ - FOLLOWER, /**< Makes this server follower */ - REPLICATE_LOG, /**< Replicate a log record on followers */ - REPLICATE_SUCCESS,/**< Follower successfully replicated entry */ - REPLICATE_FAILURE /**< Follower failed to replicate (same term) */ - }; - - RaftAction(Actions a, ReplicaRequest * rrequest): - ActionRequest(ActionRequest::USER), _action(a), _id(-1), - _rrequest(rrequest){}; - - RaftAction(Actions a, unsigned int id): - ActionRequest(ActionRequest::USER), _action(a), _id(id), _rrequest(0){}; - - RaftAction(const RaftAction& o):ActionRequest(o._type), - _action(o._action), _id(o._id), _rrequest(o._rrequest){}; - - Actions action() const - { - return _action; - } - - ActionRequest * clone() const - { - return new RaftAction(*this); - } - - unsigned int id() const - { - return _id; - } - - ReplicaRequest * request() const - { - return _rrequest; - } - -private: - /** - * Type of action to trigger on manager - */ - Actions _action; - - /** - * ID of any additional resource associated to this action: - * LEADER - new tem - * FOLLOWER - new term - * REPLICATE_SUCCESS - index of follower that replicated record - * REPLICATE_FAILURE - index of follower that replicated record - * ADD_SERVER - id of new follower - * DELETE_SERVER- id of new follower - */ - int _id; - - /** - * Pointer to replica request set for: - * REPLICATE_LOG - */ - ReplicaRequest * _rrequest; -}; - -// ----------------------------------------------------------------------------- -// ----------------------------------------------------------------------------- - class RaftManager : public ActionListener { public: @@ -127,61 +58,39 @@ public: virtual ~RaftManager(){}; // ------------------------------------------------------------------------- - // Raft associated events + // Raft associated actions (synchronous) // ------------------------------------------------------------------------- /** - * Triggers a REPLICATE_SUCCESS event, when a follower successfully - * replicated a log entry. + * Follower successfully replicated a log entry: + * - Increment next entry to send to follower + * - Update match entry on follower + * - Evaluate majority to apply changes to DB */ - void replicate_success_trigger(unsigned int follower_id) - { - RaftAction ra(RaftAction::REPLICATE_SUCCESS, follower_id); - - am.trigger(ra); - } + void replicate_success(unsigned int follower_id); /** - * Triggers a REPLICATE_FAILURE event, when a follower failed to replicate - * a log entry (but follower_term <= current_term). - * notify the client if a majority of followers replicated this record. + * Follower failed to replicate a log entry because an inconsistency was + * detected (same index, different term): + * - Decrease follower next_index + * - Retry (do not wait for replica events) */ - void replicate_failure_trigger(unsigned int follower_id) - { - RaftAction ra(RaftAction::REPLICATE_FAILURE, follower_id); - - am.trigger(ra); - } + void replicate_failure(unsigned int follower_id); /** * Triggers a REPLICATE event, it will notify the replica threads to * send the log to the followers */ - void replicate_log_trigger(ReplicaRequest * rr) - { - RaftAction ra(RaftAction::REPLICATE_LOG, rr); - - am.trigger(ra); - } + void replicate_log(ReplicaRequest * rr); /** * Makes this server leader, and start replica threads */ - void leader_trigger(unsigned int term) - { - RaftAction ra(RaftAction::LEADER, term); - - am.trigger(ra); - } + void leader(unsigned int term); /** * Makes this server follower. Stop associated replication facilities */ - void follower_trigger(unsigned int term) - { - RaftAction ra(RaftAction::FOLLOWER, term); - - am.trigger(ra); - } + void follower(unsigned int term); /** * Finalizes the Raft Consensus Manager @@ -367,30 +276,6 @@ private: */ void finalize_action(const ActionRequest& ar); - // ------------------------------------------------------------------------- - // RaftManager actions - // ------------------------------------------------------------------------- - void leader_action(const RaftAction& ra); - - void follower_action(const RaftAction& ra); - - void replicate_log_action(const RaftAction& ra); - - // ------------------------------------------------------------------------- - // Log entry replicated on follower - // - Increment next entry to send to follower - // - Update match entry on follower - // - Evaluate majority to apply changes to DB - // ------------------------------------------------------------------------- - void replicate_success_action(const RaftAction& ra); - - //-------------------------------------------------------------------------- - // Log inconsistency in follower - // - Decrease follower index - // - Retry (do not wait for replica events) - //-------------------------------------------------------------------------- - void replicate_failure_action(const RaftAction& ra); - /** * @param s the state to check * @return true if the server states matches the provided one diff --git a/src/raft/RaftManager.cc b/src/raft/RaftManager.cc index 305ffe1c4c..f3de3a0424 100644 --- a/src/raft/RaftManager.cc +++ b/src/raft/RaftManager.cc @@ -60,36 +60,6 @@ int RaftManager::start() /* -------------------------------------------------------------------------- */ -void RaftManager::user_action(const ActionRequest& ar) -{ - const RaftAction& ra = static_cast(ar); - - switch(ra.action()) - { - case RaftAction::LEADER: - leader_action(ra); - break; - - case RaftAction::FOLLOWER: - follower_action(ra); - break; - - case RaftAction::REPLICATE_LOG: - replicate_log_action(ra); - break; - - case RaftAction::REPLICATE_SUCCESS: - replicate_success_action(ra); - break; - - case RaftAction::REPLICATE_FAILURE: - replicate_failure_action(ra); - break; - } -} - -/* -------------------------------------------------------------------------- */ - void RaftManager::finalize_action(const ActionRequest& ar) { NebulaLog::log("RCM", Log::INFO, "Raft Consensus Manager..."); @@ -98,7 +68,7 @@ void RaftManager::finalize_action(const ActionRequest& ar) /* -------------------------------------------------------------------------- */ /* -------------------------------------------------------------------------- */ -void RaftManager::leader_action(const RaftAction& ra) +void RaftManager::leader(unsigned int _term) { Nebula& nd = Nebula::instance(); LogDB * logdb = nd.get_logdb(); @@ -193,7 +163,7 @@ void RaftManager::leader_action(const RaftAction& ra) state = LEADER; - term = ra.id(); + term = _term; num_servers = _num_servers; @@ -207,7 +177,7 @@ void RaftManager::leader_action(const RaftAction& ra) /* -------------------------------------------------------------------------- */ /* -------------------------------------------------------------------------- */ -void RaftManager::follower_action(const RaftAction& ra) +void RaftManager::follower(unsigned int _term) { int lapplied, lindex; @@ -222,7 +192,7 @@ void RaftManager::follower_action(const RaftAction& ra) state = FOLLOWER; - term = ra.id(); + term = _term; NebulaLog::log("RCM", Log::INFO, "oned is set to follower mode"); @@ -245,7 +215,7 @@ void RaftManager::follower_action(const RaftAction& ra) /* -------------------------------------------------------------------------- */ /* -------------------------------------------------------------------------- */ -void RaftManager::replicate_log_action(const RaftAction& ra) +void RaftManager::replicate_log(ReplicaRequest * request) { pthread_mutex_lock(&mutex); @@ -255,8 +225,6 @@ void RaftManager::replicate_log_action(const RaftAction& ra) return; } - ReplicaRequest * request = ra.request(); - if ( num_servers <= 1 ) { request->notify(); @@ -284,7 +252,7 @@ void RaftManager::replicate_log_action(const RaftAction& ra) /* -------------------------------------------------------------------------- */ /* -------------------------------------------------------------------------- */ -void RaftManager::replicate_success_action(const RaftAction& ra) +void RaftManager::replicate_success(unsigned int follower_id) { std::map::iterator it; @@ -294,8 +262,6 @@ void RaftManager::replicate_success_action(const RaftAction& ra) Nebula& nd = Nebula::instance(); LogDB * logdb = nd.get_logdb(); - int follower_id = ra.id(); - pthread_mutex_lock(&mutex); next_it = next.find(follower_id); @@ -337,19 +303,17 @@ void RaftManager::replicate_success_action(const RaftAction& ra) /* -------------------------------------------------------------------------- */ /* -------------------------------------------------------------------------- */ -void RaftManager::replicate_failure_action(const RaftAction& ra) +void RaftManager::replicate_failure(unsigned int follower_id) { std::map::iterator next_it; - int follower_id = ra.id(); - pthread_mutex_lock(&mutex); next_it = next.find(follower_id); if ( next_it != next.end() ) { - next_it->second = next_it->second - 1; + next_it->second = next_it->second - 1; } pthread_mutex_unlock(&mutex); diff --git a/src/raft/ReplicaManager.cc b/src/raft/ReplicaManager.cc index f2c3a71d7c..0dba673e0b 100644 --- a/src/raft/ReplicaManager.cc +++ b/src/raft/ReplicaManager.cc @@ -197,7 +197,6 @@ int ReplicaThread::xml_rpc_replicate(unsigned int commit, LogDBRecord * lr, return xml_rc; } - // ----------------------------------------------------------------------------- // ----------------------------------------------------------------------------- @@ -252,7 +251,7 @@ void ReplicaThread::do_replication() { ostringstream ess; - ess << "Failed to load log entry at index: " << next_index; + ess << "Failed to load log record at index: " << next_index; NebulaLog::log("RCM", Log::ERROR, ess); @@ -282,17 +281,17 @@ void ReplicaThread::do_replication() if ( success ) { - raftm->replicate_success_trigger(follower_id); + raftm->replicate_success(follower_id); } else { if ( follower_term > term ) { - raftm->follower_trigger(follower_term); + raftm->follower(follower_term); } else { - raftm->replicate_failure_trigger(follower_id); + raftm->replicate_failure(follower_id); } } } diff --git a/src/sql/LogDB.cc b/src/sql/LogDB.cc index 7e94d65041..a50c168846 100644 --- a/src/sql/LogDB.cc +++ b/src/sql/LogDB.cc @@ -78,12 +78,16 @@ int LogDB::setup_index(int& _last_applied, int& _last_index) rc += db->exec_rd(oss, this); + unset_callback(); + if ( rc == 0 ) { - next_index = _last_index + 1; - } + pthread_mutex_lock(&mutex); - unset_callback(); + next_index = _last_index + 1; + + pthread_mutex_unlock(&mutex); + } oss.str(""); @@ -94,12 +98,16 @@ int LogDB::setup_index(int& _last_applied, int& _last_index) rc += db->exec_rd(oss, this); + unset_callback(); + if ( rc == 0 ) { - last_applied = _last_applied; - } + pthread_mutex_lock(&mutex); - unset_callback(); + last_applied = _last_applied; + + pthread_mutex_unlock(&mutex); + } return rc; } @@ -285,7 +293,7 @@ int LogDB::exec_wr(ostringstream& cmd) ReplicaRequest rr(rindex); - raftm->replicate_log_trigger(&rr); + raftm->replicate_log(&rr); // Wait for completion rr.wait(); @@ -345,12 +353,15 @@ int LogDB::apply_log_records(unsigned int commit_index) { int rc; + pthread_mutex_lock(&mutex); + while (last_applied < commit_index ) { LogDBRecord * lr = get_log_record(last_applied + 1); if ( lr == 0 ) { + pthread_mutex_unlock(&mutex); return -1; } @@ -360,10 +371,13 @@ int LogDB::apply_log_records(unsigned int commit_index) if ( rc != 0 ) { + pthread_mutex_unlock(&mutex); return -1; } } + pthread_mutex_unlock(&mutex); + return 0; }