From c6a7500df598c7999607b17349136db7685368da Mon Sep 17 00:00:00 2001 From: "Ruben S. Montero" Date: Wed, 3 May 2017 20:04:42 +0200 Subject: [PATCH] F #4809: Adds vote requests from candidate, fix some bugs --- include/LogDB.h | 2 +- include/RaftManager.h | 44 ++- include/RequestManagerZone.h | 2 +- src/nebula/Nebula.cc | 33 +-- src/raft/RaftManager.cc | 560 ++++++++++++++++++++++++++--------- src/raft/ReplicaManager.cc | 7 + src/rm/RequestManagerZone.cc | 48 +-- src/sql/LogDB.cc | 2 +- 8 files changed, 482 insertions(+), 216 deletions(-) diff --git a/include/LogDB.h b/include/LogDB.h index 0e7bdc3a0a..1f9cfc2df3 100644 --- a/include/LogDB.h +++ b/include/LogDB.h @@ -135,7 +135,7 @@ public: */ int insert_raft_state(std::string& raft_xml, bool replace) { - return insert_replace(0, -1, raft_xml, 0, replace); + return insert_replace(-1, -1, raft_xml, 0, replace); } /** diff --git a/include/RaftManager.h b/include/RaftManager.h index 56964736d6..82ee03d1f7 100644 --- a/include/RaftManager.h +++ b/include/RaftManager.h @@ -77,15 +77,6 @@ public: */ void replicate_log(ReplicaRequest * rr); - /** - * Makes this server leader, and start replica threads - */ - void leader(unsigned int term); - - /** - * Makes this server follower. Stop associated replication facilities - */ - void follower(unsigned int term); /** * Finalizes the Raft Consensus Manager @@ -108,6 +99,11 @@ public: // ------------------------------------------------------------------------- // Raft state query functions // ------------------------------------------------------------------------- + /** + * Makes this server follower. Stop associated replication facilities + */ + void follower(unsigned int term); + unsigned int get_term() { unsigned int _term; @@ -164,12 +160,6 @@ public: return _commit; } - /** - * Update the term for this server. The state will be stored in the DB. - * @param _term the new term - */ - void update_term(unsigned int _term); - /** * Evaluates a vote request. It is granted if no vote has been granted in * this term or it is requested by the same candidate. @@ -241,6 +231,20 @@ public: int xmlrpc_replicate_log(int follower_id, LogDBRecord * lr, bool& success, unsigned int& ft, std::string& error); + /** + * Calls the request vote xml-rpc method + * @param follower_id to make the call + * @param lindex highest last log index + * @param lterm highest last log term + * @param success of the xml-rpc method + * @param ft term in the follower as returned by the replicate call + * @param error describing error if any + * @return -1 if a XMl-RPC (network) error occurs, 0 otherwise + */ + int xmlrpc_request_vote(int follower_id, unsigned int lindex, + unsigned int lterm, bool& success, unsigned int& fterm, + std::string& error); + // ------------------------------------------------------------------------- // Server related interface // ------------------------------------------------------------------------- @@ -393,6 +397,16 @@ private: * Send the heartbeat to the followers. */ void send_heartbeat(); + + /** + * Request votes of followers + */ + void request_vote(); + + /** + * Makes this server leader, and start replica threads + */ + void leader(); }; #endif /*RAFT_MANAGER_H_*/ diff --git a/include/RequestManagerZone.h b/include/RequestManagerZone.h index 5ae23b2365..bbc7933dbe 100644 --- a/include/RequestManagerZone.h +++ b/include/RequestManagerZone.h @@ -88,7 +88,7 @@ class ZoneReplicateLog : public RequestManagerZone public: ZoneReplicateLog(): RequestManagerZone("ZoneReplicateLog", "Replicate a log record", - "A:siiiiiis"){}; + "A:siiiiiiis"){}; ~ZoneReplicateLog(){}; diff --git a/src/nebula/Nebula.cc b/src/nebula/Nebula.cc index 989de0e026..39accdb97f 100644 --- a/src/nebula/Nebula.cc +++ b/src/nebula/Nebula.cc @@ -309,7 +309,7 @@ void Nebula::start(bool bootstrap_only) db_backend = new MySqlDB(server, port, user, passwd, db_name); } - solo = server_id == -1; + solo = server_id == -1 || bootstrap_only; if ( solo ) { @@ -1040,37 +1040,6 @@ void Nebula::start(bool bootstrap_only) throw runtime_error("Could not start the Request Manager"); } - // ----------------------------------------------------------- - // Start HA mode if working in a cluster of oned's - // ----------------------------------------------------------- - - if ( server_id == -1 ) - { - NebulaLog::log("ONE", Log::INFO, "No SERVER_ID defined, oned started " - "in solo mode."); - } - else - { - //////////////////////////////////////////////////////////////////////// - // LOG REPLICATION DEBUG // - //////////////////////////////////////////////////////////////////////// - if ( server_id == 0 ) - { - raftm->leader(0); - } - else - { - raftm->follower(0); - } - //////////////////////////////////////////////////////////////////////// - // TODO: - // - start all servers in follower mode - // - load current term from DB (sysconfig attribute?) - // - load last vote casted - //////////////////////////////////////////////////////////////////////// - } - - // ----------------------------------------------------------- // Wait for a SIGTERM or SIGINT signal // ----------------------------------------------------------- diff --git a/src/raft/RaftManager.cc b/src/raft/RaftManager.cc index 3d08b00dc8..c59c0181ac 100644 --- a/src/raft/RaftManager.cc +++ b/src/raft/RaftManager.cc @@ -36,39 +36,34 @@ static void set_timeout(long long ms, struct timespec& timeout) timeout.tv_nsec = d.rem * 1000000; } +static unsigned int get_zone_servers(std::map& _s); + +/* -------------------------------------------------------------------------- */ +/* -------------------------------------------------------------------------- */ +/* RaftManager component life-cycle functions */ +/* -------------------------------------------------------------------------- */ +/* -------------------------------------------------------------------------- */ + RaftManager::RaftManager(int id, time_t log_purge, long long bcast, long long elect, time_t xmlrpc):server_id(id), term(0), num_servers(0), commit(0) { + Nebula& nd = Nebula::instance(); + LogDB * logdb = nd.get_logdb(); + std::string raft_xml; pthread_mutex_init(&mutex, 0); - if ( server_id == -1 ) - { - state = SOLO; - } - else - { - state = FOLLOWER; - } - am.addListener(this); - purge_period_ms = log_purge * 1000; - xmlrpc_timeout_ms = xmlrpc; - - set_timeout(bcast, broadcast_timeout); - //TODO: RANDOM INITALIZATIZION OF BASE ELECT - set_timeout(elect, election_timeout); - - // Warm up of 5 seconds to start the election timeout - clock_gettime(CLOCK_REALTIME, &last_heartbeat); - last_heartbeat.tv_sec += 5; - - Nebula& nd = Nebula::instance(); - LogDB * logdb = nd.get_logdb(); - + // ------------------------------------------------------------------------- + // Initialize Raft variables: + // - state + // - servers + // - votedfor + // - term + // ------------------------------------------------------------------------- if ( logdb->get_raft_state(raft_xml) != 0 ) { raft_state.replace("TERM", 0); @@ -88,6 +83,33 @@ RaftManager::RaftManager(int id, time_t log_purge, long long bcast, raft_state.get("TERM", term); raft_state.get("VOTEDFOR", votedfor); } + + num_servers = get_zone_servers(servers); + + if ( server_id == -1 ) + { + NebulaLog::log("ONE", Log::INFO, "oned started in solo mode."); + state = SOLO; + } + else + { + NebulaLog::log("RCM", Log::INFO, "oned started in follower mode"); + state = FOLLOWER; + } + + // ------------------------------------------------------------------------- + // Initialize Raft timers + // TODO: randomize election timeout + // ------------------------------------------------------------------------- + purge_period_ms = log_purge * 1000; + xmlrpc_timeout_ms = xmlrpc; + + set_timeout(bcast, broadcast_timeout); + set_timeout(elect, election_timeout); + + // 5 seconds warm-up to start election + clock_gettime(CLOCK_REALTIME, &last_heartbeat); + last_heartbeat.tv_sec += 5; }; /* -------------------------------------------------------------------------- */ @@ -141,6 +163,9 @@ void RaftManager::finalize_action(const ActionRequest& ar) NebulaLog::log("RCM", Log::INFO, "Raft Consensus Manager..."); } +/* -------------------------------------------------------------------------- */ +/* -------------------------------------------------------------------------- */ +/* Server management interface */ /* -------------------------------------------------------------------------- */ /* -------------------------------------------------------------------------- */ @@ -230,10 +255,13 @@ void RaftManager::delete_server(unsigned int follower_id) pthread_mutex_unlock(&mutex); }; +/* -------------------------------------------------------------------------- */ +/* -------------------------------------------------------------------------- */ +/* State transitions & and callbacks */ /* -------------------------------------------------------------------------- */ /* -------------------------------------------------------------------------- */ -void RaftManager::leader(unsigned int _term) +void RaftManager::leader() { LogDB * logdb = Nebula::instance().get_logdb(); @@ -242,47 +270,44 @@ void RaftManager::leader(unsigned int _term) int index, _applied; - unsigned int _num_servers; std::map _servers; std::ostringstream oss; + std::string raft_state_xml; + + logdb->setup_index(_applied, index); + pthread_mutex_lock(&mutex); - if ( state != FOLLOWER ) + if ( state != CANDIDATE ) { + NebulaLog::log("RCM", Log::INFO, "Cannot become leader, no longer " + "candidate"); + pthread_mutex_unlock(&mutex); + return; } - pthread_mutex_unlock(&mutex); - - //-------------------------------------------------------------------------- - // Initialize leader variables - //-------------------------------------------------------------------------- - logdb->setup_index(_applied, index); - oss << "Becoming leader of zone. Last log record: " << index << " last " << "applied record: " << _applied; NebulaLog::log("RCM", Log::INFO, oss); - _num_servers = get_zone_servers(_servers); - - pthread_mutex_lock(&mutex); - next.clear(); match.clear(); requests.clear(); - num_servers = _num_servers; - servers = _servers; - state = LEADER; - commit = _applied; - term = _term; + commit = _applied; + votedfor = -1; + + raft_state.replace("VOTEDFOR", votedfor); + + raft_state.to_xml(raft_state_xml); for (it = servers.begin(); it != servers.end() ; ++it ) { @@ -302,6 +327,8 @@ void RaftManager::leader(unsigned int _term) pthread_mutex_unlock(&mutex); + logdb->insert_raft_state(raft_state_xml, true); + NebulaLog::log("RCM", Log::INFO, "oned is now the leader of zone"); } @@ -315,6 +342,8 @@ void RaftManager::follower(unsigned int _term) Nebula& nd = Nebula::instance(); LogDB * logdb = nd.get_logdb(); + std::string raft_state_xml; + logdb->setup_index(lapplied, lindex); pthread_mutex_lock(&mutex); @@ -323,7 +352,15 @@ void RaftManager::follower(unsigned int _term) state = FOLLOWER; - term = _term; + term = _term; + votedfor = -1; + + commit = lapplied; + + raft_state.replace("VOTEDFOR", votedfor); + raft_state.replace("TERM", term); + + raft_state.to_xml(raft_state_xml); NebulaLog::log("RCM", Log::INFO, "oned is set to follower mode"); @@ -338,9 +375,14 @@ void RaftManager::follower(unsigned int _term) it->second->notify(); } + next.clear(); + match.clear(); + requests.clear(); pthread_mutex_unlock(&mutex); + + logdb->insert_raft_state(raft_state_xml, true); } /* -------------------------------------------------------------------------- */ @@ -456,6 +498,9 @@ void RaftManager::replicate_failure(unsigned int follower_id) pthread_mutex_unlock(&mutex); } +/* -------------------------------------------------------------------------- */ +/* -------------------------------------------------------------------------- */ +/* Raft state interface */ /* -------------------------------------------------------------------------- */ /* -------------------------------------------------------------------------- */ @@ -471,41 +516,6 @@ void RaftManager::update_last_heartbeat() /* -------------------------------------------------------------------------- */ /* -------------------------------------------------------------------------- */ -void RaftManager::update_term(unsigned int _term) -{ - Nebula& nd = Nebula::instance(); - LogDB * logdb = nd.get_logdb(); - - std::string raft_state_xml; - - bool new_term = false; - - pthread_mutex_lock(&mutex); - - if ( _term > term ) - { - NebulaLog::log("RCM", Log::INFO, "Follower entering a new term."); - - new_term = true; - votedfor = -1; - - raft_state.replace("TERM", _term); - raft_state.to_xml(raft_state_xml); - } - - term = _term; - - pthread_mutex_unlock(&mutex); - - if ( new_term == true ) - { - logdb->insert_raft_state(raft_state_xml, true); - } -} - -/* -------------------------------------------------------------------------- */ -/* -------------------------------------------------------------------------- */ - int RaftManager::update_votedfor(int _votedfor) { Nebula& nd = Nebula::instance(); @@ -535,69 +545,6 @@ int RaftManager::update_votedfor(int _votedfor) return 0; } -/* -------------------------------------------------------------------------- */ -/* -------------------------------------------------------------------------- */ -void RaftManager::send_heartbeat() -{ - std::map _servers; - std::map::iterator it; - - LogDBRecord lr; - - bool success; - unsigned int fterm; - - std::string error; - - lr.index = 0; - lr.prev_index = 0; - - lr.term = 0; - lr.prev_term = 0; - - lr.sql = ""; - - lr.timestamp = 0; - - pthread_mutex_lock(&mutex); - - if ( state != LEADER ) - { - pthread_mutex_unlock(&mutex); - return; - } - - _servers = servers; - - pthread_mutex_unlock(&mutex); - - for (it = _servers.begin(); it != _servers.end() ; ++it ) - { - if ( it->first == (unsigned int) server_id ) - { - continue; - } - - int rc = xmlrpc_replicate_log(it->first, &lr, success, fterm, error); - - if ( rc == -1 ) - { - std::ostringstream oss; - - oss << "Error sending heartbeat to follower " << it->first <<": " - << error; - - NebulaLog::log("RCM", Log::INFO, oss); - } - else if ( success == false && fterm > term ) - { - follower(fterm); - - break; - } - } -} - /* -------------------------------------------------------------------------- */ /* -------------------------------------------------------------------------- */ @@ -659,15 +606,22 @@ void RaftManager::timer_action(const ActionRequest& ar) time_t sec = last_heartbeat.tv_sec + election_timeout.tv_sec; long nsec = last_heartbeat.tv_nsec + election_timeout.tv_nsec; - pthread_mutex_unlock(&mutex); - if ((sec < the_time.tv_sec) || (sec == the_time.tv_sec && nsec <= the_time.tv_nsec)) { NebulaLog::log("RRM", Log::ERROR, "Failed to get heartbeat from " "leader. Starting election proccess"); - //TODO + + state = CANDIDATE; + + clock_gettime(CLOCK_REALTIME, &last_heartbeat); + + pthread_mutex_unlock(&mutex); + + request_vote(); } + + pthread_mutex_unlock(&mutex); } else { @@ -677,14 +631,202 @@ void RaftManager::timer_action(const ActionRequest& ar) return; } -// ----------------------------------------------------------------------------- -// ----------------------------------------------------------------------------- +/* -------------------------------------------------------------------------- */ +/* -------------------------------------------------------------------------- */ +/* XML-RPC interface to talk to followers */ +/* -------------------------------------------------------------------------- */ +/* -------------------------------------------------------------------------- */ + +void RaftManager::send_heartbeat() +{ + std::map _servers; + std::map::iterator it; + + LogDBRecord lr; + + bool success; + unsigned int fterm; + + std::string error; + + lr.index = 0; + lr.prev_index = 0; + + lr.term = 0; + lr.prev_term = 0; + + lr.sql = ""; + + lr.timestamp = 0; + + pthread_mutex_lock(&mutex); + + if ( state != LEADER ) + { + pthread_mutex_unlock(&mutex); + return; + } + + _servers = servers; + + pthread_mutex_unlock(&mutex); + + for (it = _servers.begin(); it != _servers.end() ; ++it ) + { + if ( it->first == (unsigned int) server_id ) + { + continue; + } + + int rc = xmlrpc_replicate_log(it->first, &lr, success, fterm, error); + + if ( rc == -1 ) + { + std::ostringstream oss; + + oss << "Error sending heartbeat to follower " << it->first <<": " + << error; + + NebulaLog::log("RCM", Log::INFO, oss); + } + else if ( success == false && fterm > term ) + { + std::ostringstream oss; + + oss << "Follower " << it->first << " term (" << fterm + << ") is higher than current (" << term << ")"; + + NebulaLog::log("RCM", Log::INFO, oss); + + follower(fterm); + + break; + } + } +} + +/* -------------------------------------------------------------------------- */ +/* -------------------------------------------------------------------------- */ + +void RaftManager::request_vote() +{ + unsigned int lindex, lterm, fterm; + + std::map _servers; + std::map::iterator it; + + std::ostringstream oss; + + unsigned int granted_votes = 0; + unsigned int votes2go; + + Nebula& nd = Nebula::instance(); + LogDB * logdb = nd.get_logdb(); + + int rc; + + std::string error; + std::string raft_state_xml; + + bool success; + + pthread_mutex_lock(&mutex); + + if ( state != CANDIDATE ) + { + pthread_mutex_unlock(&mutex); + return; + } + + term = term + 1; + votedfor = server_id; + + raft_state.replace("TERM", term); + raft_state.replace("VOTEDFOR", votedfor); + + raft_state.to_xml(raft_state_xml); + + votes2go = num_servers / 2; + + pthread_mutex_unlock(&mutex); + + logdb->insert_raft_state(raft_state_xml, true); + + logdb->get_last_record_index(lindex, lterm); + + for (it = _servers.begin(); it != _servers.end() ; ++it, oss.str("") ) + { + if ( it->first == (unsigned int) server_id ) + { + continue; + } + + rc = xmlrpc_request_vote(it->first, lindex, lterm, success, fterm, error); + + if ( rc == -1 ) + { + oss << "Error sending vote request to follower " << it->first <<": " + << error; + + NebulaLog::log("RCM", Log::INFO, oss); + } + else if ( success == false && fterm > term ) + { + oss << "Follower " << it->first << " has a higher term, turning " + << "into follower"; + + NebulaLog::log("RCM", Log::INFO, oss); + + follower(fterm); + + break; + } + else if ( success == true ) + { + granted_votes++; + + oss << "Got vote from server: " << it->first << ". Total votes: " + << granted_votes; + + NebulaLog::log("RCM", Log::INFO, oss); + } + + if ( granted_votes >= votes2go ) + { + NebulaLog::log("RCM", Log::INFO, "Got majority of votes"); + break; + } + } + + if ( granted_votes >= votes2go ) + { + leader(); + return; + } + + //This election process failed, start a new one by expiring heartbeat + pthread_mutex_lock(&mutex); + + if (state == CANDIDATE) + { + state = FOLLOWER; + + last_heartbeat.tv_sec = 0; + last_heartbeat.tv_nsec= 0; + } + + pthread_mutex_unlock(&mutex); +} + +/* -------------------------------------------------------------------------- */ +/* -------------------------------------------------------------------------- */ int RaftManager::xmlrpc_replicate_log(int follower_id, LogDBRecord * lr, bool& success, unsigned int& fterm, std::string& error) { int _server_id; int _commit; + int _term; static const std::string replica_method = "one.zone.replicate"; @@ -712,6 +854,7 @@ int RaftManager::xmlrpc_replicate_log(int follower_id, LogDBRecord * lr, follower_edp = it->second; _commit = commit; + _term = term; _server_id = server_id; pthread_mutex_unlock(&mutex); @@ -736,6 +879,7 @@ int RaftManager::xmlrpc_replicate_log(int follower_id, LogDBRecord * lr, replica_params.add(xmlrpc_c::value_string(secret)); replica_params.add(xmlrpc_c::value_int(_server_id)); replica_params.add(xmlrpc_c::value_int(_commit)); + replica_params.add(xmlrpc_c::value_int(_term)); replica_params.add(xmlrpc_c::value_int(lr->index)); replica_params.add(xmlrpc_c::value_int(lr->term)); replica_params.add(xmlrpc_c::value_int(lr->prev_index)); @@ -804,3 +948,127 @@ int RaftManager::xmlrpc_replicate_log(int follower_id, LogDBRecord * lr, return xml_rc; } +/* -------------------------------------------------------------------------- */ +/* -------------------------------------------------------------------------- */ + +int RaftManager::xmlrpc_request_vote(int follower_id, unsigned int lindex, + unsigned int lterm, bool& success, unsigned int& fterm, + std::string& error) +{ + int _server_id; + int _term; + + static const std::string replica_method = "one.zone.voterequest"; + + std::ostringstream ess; + + std::string secret; + std::string follower_edp; + + std::map::iterator it; + + int xml_rc = 0; + + pthread_mutex_lock(&mutex); + + it = servers.find(follower_id); + + if ( it == servers.end() ) + { + error = "Cannot find follower end point"; + pthread_mutex_unlock(&mutex); + + return -1; + } + + follower_edp = it->second; + + _term = term; + _server_id = server_id; + + pthread_mutex_unlock(&mutex); + + // ------------------------------------------------------------------------- + // Get parameters to call append entries on follower + // ------------------------------------------------------------------------- + if ( Client::read_oneauth(secret, error) == -1 ) + { + NebulaLog::log("RRM", Log::ERROR, error); + return -1; + } + + xmlrpc_c::carriageParm_curl0 carriage(follower_edp); + + xmlrpc_c::paramList replica_params; + + xmlrpc_c::clientXmlTransport_curl transport; + + xmlrpc_c::client_xml client(&transport); + + replica_params.add(xmlrpc_c::value_string(secret)); + replica_params.add(xmlrpc_c::value_int(_term)); + replica_params.add(xmlrpc_c::value_int(_server_id)); + replica_params.add(xmlrpc_c::value_int(lindex)); + replica_params.add(xmlrpc_c::value_int(lterm)); + + xmlrpc_c::rpcPtr rpc_client(replica_method, replica_params); + + // ------------------------------------------------------------------------- + // Do the XML-RPC call + // ------------------------------------------------------------------------- + try + { + rpc_client->start(&client, &carriage); + + client.finishAsync(xmlrpc_c::timeout(xmlrpc_timeout_ms)); + + if (!rpc_client->isFinished()) + { + rpc_client->finishErr(girerr::error("XMLRPC method "+replica_method + + " timeout, resetting call")); + } + + if ( rpc_client->isSuccessful() ) + { + vector values; + + xmlrpc_c::value result = rpc_client->getResult(); + + values = xmlrpc_c::value_array(result).vectorValueValue(); + success = xmlrpc_c::value_boolean(values[0]); + + if ( success ) //values[2] = error code (string) + { + fterm = xmlrpc_c::value_int(values[1]); + } + else + { + error = xmlrpc_c::value_string(values[1]); + fterm = xmlrpc_c::value_int(values[3]); + } + } + else //RPC failed, vote not granted + { + xmlrpc_c::fault failure = rpc_client->getFault(); + + ess << "Error requesting vote from follower " << follower_id << ": " + << failure.getDescription(); + + error = ess.str(); + + xml_rc = -1; + } + } + catch (exception const& e) + { + ess << "Error requesting vote from follower " << follower_id << ": " + << e.what(); + + error = ess.str(); + + xml_rc = -1; + } + + return xml_rc; +} + diff --git a/src/raft/ReplicaManager.cc b/src/raft/ReplicaManager.cc index 81c101b4c5..491eccd881 100644 --- a/src/raft/ReplicaManager.cc +++ b/src/raft/ReplicaManager.cc @@ -151,6 +151,13 @@ void ReplicaThread::do_replication() { if ( follower_term > term ) { + ostringstream ess; + + ess << "Follower " << follower_id << " term (" << follower_term + << ") is higher than current (" << term << ")"; + + NebulaLog::log("RCM", Log::INFO, ess); + raftm->follower(follower_term); } else diff --git a/src/rm/RequestManagerZone.cc b/src/rm/RequestManagerZone.cc index cdd99d1505..fbc3ab0bd4 100644 --- a/src/rm/RequestManagerZone.cc +++ b/src/rm/RequestManagerZone.cc @@ -126,15 +126,16 @@ void ZoneReplicateLog::request_execute(xmlrpc_c::paramList const& paramList, RaftManager * raftm = nd.get_raftm(); - int leader_id = xmlrpc_c::value_int(paramList.getInt(1)); - int commit = xmlrpc_c::value_int(paramList.getInt(2)); + int leader_id = xmlrpc_c::value_int(paramList.getInt(1)); + int leader_commit = xmlrpc_c::value_int(paramList.getInt(2)); + unsigned int leader_term = xmlrpc_c::value_int(paramList.getInt(3)); - unsigned int index = xmlrpc_c::value_int(paramList.getInt(3)); - unsigned int term = xmlrpc_c::value_int(paramList.getInt(4)); - unsigned int prev_index = xmlrpc_c::value_int(paramList.getInt(5)); - unsigned int prev_term = xmlrpc_c::value_int(paramList.getInt(6)); + unsigned int index = xmlrpc_c::value_int(paramList.getInt(4)); + unsigned int term = xmlrpc_c::value_int(paramList.getInt(5)); + unsigned int prev_index = xmlrpc_c::value_int(paramList.getInt(6)); + unsigned int prev_term = xmlrpc_c::value_int(paramList.getInt(7)); - string sql = xmlrpc_c::value_string(paramList.getString(7)); + string sql = xmlrpc_c::value_string(paramList.getString(8)); unsigned int current_term = raftm->get_term(); @@ -148,22 +149,31 @@ void ZoneReplicateLog::request_execute(xmlrpc_c::paramList const& paramList, return; } - if ( !raftm->is_follower() ) + if ( leader_term < current_term ) { - att.resp_msg = "oned is not a follower. Cannot replicate log record."; + std::ostringstream oss; + + oss << "Leader term (" << leader_term << ") is outdated (" + << current_term<<")"; + + NebulaLog::log("ReM", Log::INFO, oss); + + att.resp_msg = oss.str(); att.resp_id = current_term; failure_response(ACTION, att); return; } - - if ( term < current_term ) + else if ( leader_term > current_term ) { - att.resp_msg = "Leader term is outdated"; - att.resp_id = current_term; + std::ostringstream oss; - failure_response(ACTION, att); - return; + oss << "New term (" << leader_term << ") discovered from leader " + << leader_id; + + NebulaLog::log("ReM", Log::INFO, oss); + + raftm->follower(leader_term); } raftm->update_last_heartbeat(); @@ -225,9 +235,7 @@ void ZoneReplicateLog::request_execute(xmlrpc_c::paramList const& paramList, return; } - raftm->update_term(term); - - unsigned int new_commit = raftm->update_commit(commit, index); + unsigned int new_commit = raftm->update_commit(leader_commit, index); logdb->apply_log_records(new_commit); @@ -274,7 +282,6 @@ void ZoneVoteRequest::request_execute(xmlrpc_c::paramList const& paramList, return; } - if ((log_term > candidate_log_term) || ((log_term == candidate_log_term) && (log_index > candidate_log_index))) { @@ -283,7 +290,6 @@ void ZoneVoteRequest::request_execute(xmlrpc_c::paramList const& paramList, failure_response(ACTION, att); return; - } if ( raftm->update_votedfor(candidate_id) != 0 ) @@ -295,6 +301,8 @@ void ZoneVoteRequest::request_execute(xmlrpc_c::paramList const& paramList, return; } + raftm->update_last_heartbeat(); + success_response(static_cast(current_term), att); } diff --git a/src/sql/LogDB.cc b/src/sql/LogDB.cc index 3e76dd2538..96abe5a468 100644 --- a/src/sql/LogDB.cc +++ b/src/sql/LogDB.cc @@ -187,7 +187,7 @@ int LogDB::get_raft_state(std::string &raft_xml) { ostringstream oss; - oss << "SELECT sql FROM logdb WHERE log_index = 0 AND term = -1"; + oss << "SELECT sql FROM logdb WHERE log_index = -1 AND term = -1"; set_callback(static_cast(&LogDB::raft_state_cb), static_cast(&raft_xml));