diff --git a/include/Callbackable.h b/include/Callbackable.h index 279117751c..50df0e418c 100644 --- a/include/Callbackable.h +++ b/include/Callbackable.h @@ -18,6 +18,7 @@ #define CALLBACKABLE_H_ #include +#include using namespace std; @@ -105,4 +106,39 @@ private: pthread_mutex_t mutex; }; +/* -------------------------------------------------------------------------- */ +/* Classes to obtain values from a DB it support concurrent queries using */ +/* different objects */ +/* -------------------------------------------------------------------------- */ + +template +class single_cb : public Callbackable +{ +public: + void set_callback(T * _value) + { + value = _value; + + Callbackable::set_callback( + static_cast(&single_cb::callback)); + } + + virtual int callback(void *nil, int num, char **values, char **names) + { + if ( values == 0 || values[0] == 0 || num != 1 ) + { + return -1; + } + + std::istringstream iss(values[0]); + + iss >> *value; + + return 0; + } + +private: + T * value; +}; + #endif /*CALLBACKABLE_H_*/ diff --git a/include/LogDB.h b/include/LogDB.h index 1f9cfc2df3..c61176c26c 100644 --- a/include/LogDB.h +++ b/include/LogDB.h @@ -25,8 +25,9 @@ /** * This class represents a log record */ -struct LogDBRecord +class LogDBRecord : public Callbackable { +public: /** * Index for this log entry (and previous) */ @@ -50,13 +51,46 @@ struct LogDBRecord * Time when the record has been applied to DB. 0 if not applied */ time_t timestamp; + + /** + * Sets callback to load register from DB + */ + void set_callback() + { + Callbackable::set_callback( + static_cast(&LogDBRecord::select_cb)); + } + +private: + /** + * SQL callback to load logDBRecord from DB (SELECT commands) + */ + int select_cb(void *nil, int num, char **values, char **names) + { + if ( !values || !values[0] || !values[1] || !values[2] || !values[3] || + !values[4] || !values[5] || num != 6 ) + { + return -1; + } + + index = static_cast(atoi(values[0])); + term = static_cast(atoi(values[1])); + sql = values[2]; + + timestamp = static_cast(atoi(values[3])); + + prev_index = static_cast(atoi(values[4])); + prev_term = static_cast(atoi(values[5])); + + return 0; + } }; /** * This class implements a generic DB interface with replication. The associated * DB stores a log to replicate on followers. */ -class LogDB : public SqlDB, Callbackable +class LogDB : public SqlDB { public: LogDB(SqlDB * _db, bool solo, const std::string& log_retention); @@ -70,9 +104,10 @@ public: * Loads a log record from the database. Memory is allocated by this class * and needs to be freed. * @param index of the associated logDB entry - * @return the LogDB record + * @param lr logDBrecored to load from the DB + * @return 0 on success -1 otherwise */ - LogDBRecord * get_log_record(unsigned int index); + int get_log_record(unsigned int index, LogDBRecord& lr); /** * Applies the SQL command of the given record to the database. The @@ -92,8 +127,7 @@ public: /** * Inserts a new log record in the database. This method should be used - * in FOLLOWER mode to replicate leader log. It updates next_index and - * last_term to evaluate vote requests. + * in FOLLOWER mode to replicate leader log. It does not update counters. * @param index for the record * @param term for the record * @param sql command of the record @@ -104,28 +138,12 @@ public: int insert_log_record(unsigned int index, unsigned int term, std::ostringstream& sql, time_t timestamp) { - int rc; - - pthread_mutex_lock(&mutex); - - rc = insert_replace(index, term, sql.str(), timestamp, false); - - if ( rc == 0 && term >= 0 && index >= 0 ) - { - next_index = index + 1; - - last_term = term; - } - - pthread_mutex_unlock(&mutex); - - return rc; + return insert_replace(index, term, sql.str(), timestamp, false); } //-------------------------------------------------------------------------- // Functions to manage the Raft state. Log record 0, term -1 // ------------------------------------------------------------------------- - /** * Stores the raft state in the log * @param raft attributes in XML format @@ -206,19 +224,13 @@ public: int setup_index(int& last_applied, int& last_index); /** - * Gets the index of the last record in the log + * Gets the index & term of the last record in the log * @param _i the index * @param _t the term + * + * @return 0 on success */ - void get_last_record_index(unsigned int& _i, unsigned int& _t) - { - pthread_mutex_lock(&mutex); - - _i = next_index - 1; - _t = last_term; - - pthread_mutex_unlock(&mutex); - } + int get_last_record_index(unsigned int& _i, unsigned int& _t); protected: int exec(std::ostringstream& cmd, Callbackable* obj, bool quiet) @@ -249,11 +261,6 @@ private: */ unsigned int last_applied; - /** - * Term of the last entry added to the log - */ - unsigned int last_term; - /** * Max number of records to keep in the database */ @@ -268,21 +275,6 @@ private: static const char * db_bootstrap; - /** - * Callback to initialize the next_index and last_appled varibales. - */ - int setup_index_cb(void *nil, int num, char **values, char **names); - - /** - * SQL callback for log record SELECT commands - */ - int select_cb(void *req, int num, char **values, char **names); - - /** - * SQL callback for loading the raft state - */ - int raft_state_cb(void *str_value, int num, char **values, char **names); - /** * Inserts or update a log record in the database * @param index of the log entry diff --git a/src/raft/RaftManager.cc b/src/raft/RaftManager.cc index c59c0181ac..466f717cce 100644 --- a/src/raft/RaftManager.cc +++ b/src/raft/RaftManager.cc @@ -710,7 +710,7 @@ void RaftManager::send_heartbeat() void RaftManager::request_vote() { - unsigned int lindex, lterm, fterm; + unsigned int lindex, lterm, fterm, _term, _server_id; std::map _servers; std::map::iterator it; @@ -738,6 +738,8 @@ void RaftManager::request_vote() return; } + _servers = servers; + term = term + 1; votedfor = server_id; @@ -748,6 +750,9 @@ void RaftManager::request_vote() votes2go = num_servers / 2; + _term = term; + _server_id = server_id; + pthread_mutex_unlock(&mutex); logdb->insert_raft_state(raft_state_xml, true); @@ -756,7 +761,7 @@ void RaftManager::request_vote() for (it = _servers.begin(); it != _servers.end() ; ++it, oss.str("") ) { - if ( it->first == (unsigned int) server_id ) + if ( it->first == _server_id ) { continue; } @@ -765,27 +770,33 @@ void RaftManager::request_vote() if ( rc == -1 ) { - oss << "Error sending vote request to follower " << it->first <<": " - << error; - - NebulaLog::log("RCM", Log::INFO, oss); + NebulaLog::log("RCM", Log::INFO, error); } - else if ( success == false && fterm > term ) + else if ( success == false ) { - oss << "Follower " << it->first << " has a higher term, turning " - << "into follower"; + oss << "Vote not granted from follower " << it->first << ": " + << error; - NebulaLog::log("RCM", Log::INFO, oss); + NebulaLog::log("RCM", Log::INFO, oss); - follower(fterm); + if ( fterm > _term ) + { + oss.str(""); + oss << "Follower " << it->first << " is in term " << fterm + << " current term is "<< _term << ". Turning into follower"; - break; + NebulaLog::log("RCM", Log::INFO, oss); + + follower(fterm); + + break; + } } else if ( success == true ) { granted_votes++; - oss << "Got vote from server: " << it->first << ". Total votes: " + oss << "Got vote from follower " << it->first << ". Total votes: " << granted_votes; NebulaLog::log("RCM", Log::INFO, oss); @@ -811,8 +822,7 @@ void RaftManager::request_vote() { state = FOLLOWER; - last_heartbeat.tv_sec = 0; - last_heartbeat.tv_nsec= 0; + clock_gettime(CLOCK_REALTIME, &last_heartbeat); } pthread_mutex_unlock(&mutex); diff --git a/src/raft/ReplicaManager.cc b/src/raft/ReplicaManager.cc index 491eccd881..8555e91d9b 100644 --- a/src/raft/ReplicaManager.cc +++ b/src/raft/ReplicaManager.cc @@ -102,13 +102,14 @@ void ReplicaThread::do_replication() // --------------------------------------------------------------------- // Get parameters to call append entries on follower // --------------------------------------------------------------------- - int next_index = raftm->get_next_index(follower_id); - LogDBRecord * lr = logdb->get_log_record(next_index); + LogDBRecord lr; + + int next_index = raftm->get_next_index(follower_id); bool success = false; unsigned int follower_term = -1; - if ( lr == 0 ) + if ( logdb->get_log_record(next_index, lr) != 0 ) { ostringstream ess; @@ -119,11 +120,9 @@ void ReplicaThread::do_replication() continue; } - int xml_rc = raftm->xmlrpc_replicate_log(follower_id, lr, success, + int xml_rc = raftm->xmlrpc_replicate_log(follower_id, &lr, success, follower_term, error); - delete lr; - if ( xml_rc == -1 ) { NebulaLog::log("RCM", Log::ERROR, error); diff --git a/src/rm/RequestManager.cc b/src/rm/RequestManager.cc index 8a7ee69126..a1a11fb5ab 100644 --- a/src/rm/RequestManager.cc +++ b/src/rm/RequestManager.cc @@ -788,6 +788,7 @@ void RequestManager::register_xml_methods() xmlrpc_c::methodPtr zone_addserver(zone_addserver_pt); xmlrpc_c::methodPtr zone_delserver(zone_delserver_pt); xmlrpc_c::methodPtr zone_replicatelog(new ZoneReplicateLog()); + xmlrpc_c::methodPtr zone_voterequest(new ZoneVoteRequest()); xmlrpc_c::methodPtr zone_info(new ZoneInfo()); xmlrpc_c::methodPtr zonepool_info(new ZonePoolInfo()); @@ -798,6 +799,7 @@ void RequestManager::register_xml_methods() RequestManagerRegistry.addMethod("one.zone.info", zone_info); RequestManagerRegistry.addMethod("one.zone.rename", zone_rename); RequestManagerRegistry.addMethod("one.zone.replicate",zone_replicatelog); + RequestManagerRegistry.addMethod("one.zone.voterequest",zone_voterequest); RequestManagerRegistry.addMethod("one.zone.addserver", zone_addserver); RequestManagerRegistry.addMethod("one.zone.delserver", zone_delserver); diff --git a/src/rm/RequestManagerZone.cc b/src/rm/RequestManagerZone.cc index fbc3ab0bd4..57b500e950 100644 --- a/src/rm/RequestManagerZone.cc +++ b/src/rm/RequestManagerZone.cc @@ -139,7 +139,7 @@ void ZoneReplicateLog::request_execute(xmlrpc_c::paramList const& paramList, unsigned int current_term = raftm->get_term(); - LogDBRecord * lr; + LogDBRecord lr, prev_lr; if ( att.uid != 0 ) { @@ -189,36 +189,28 @@ void ZoneReplicateLog::request_execute(xmlrpc_c::paramList const& paramList, //REPLICATE if ( index > 0 ) { - lr = logdb->get_log_record(prev_index); - - if ( lr == 0 ) + if ( logdb->get_log_record(prev_index, prev_lr) != 0 ) { - att.resp_msg = "Previous log record missing"; + att.resp_msg = "Error loading previous log record"; att.resp_id = current_term; failure_response(ACTION, att); return; } - if ( lr->prev_term != prev_term ) + if ( prev_lr.term != prev_term ) { - delete lr; - att.resp_msg = "Previous log record missmatch"; att.resp_id = current_term; failure_response(ACTION, att); return; } - - delete lr; } - lr = logdb->get_log_record(index); - - if ( lr != 0 ) + if ( logdb->get_log_record(index, lr) != 0 ) { - if ( lr->term != term ) + if ( lr.term != term ) { logdb->delete_log_records(index); } diff --git a/src/sql/LogDB.cc b/src/sql/LogDB.cc index 96abe5a468..eed89d9e62 100644 --- a/src/sql/LogDB.cc +++ b/src/sql/LogDB.cc @@ -17,6 +17,7 @@ #include "LogDB.h" #include "Nebula.h" #include "ZoneServer.h" +#include "Callbackable.h" /* -------------------------------------------------------------------------- */ /* -------------------------------------------------------------------------- */ @@ -50,34 +51,24 @@ LogDB::~LogDB() /* -------------------------------------------------------------------------- */ /* -------------------------------------------------------------------------- */ -int LogDB::setup_index_cb(void *int_value, int num, char **values, char **names) -{ - int * value = static_cast(int_value); - - if ( values[0] != 0 && num == 1 ) - { - *value = atoi(values[0]); - } - - return 0; -} - int LogDB::setup_index(int& _last_applied, int& _last_index) { - std::ostringstream oss; int rc = 0; + std::ostringstream oss; + + single_cb cb; + _last_applied = 0; _last_index = -1; - set_callback(static_cast(&LogDB::setup_index_cb), - static_cast(&_last_index)); + cb.set_callback(&_last_index); oss << "SELECT MAX(log_index) FROM logdb"; - rc += db->exec_rd(oss, this); + rc += db->exec_rd(oss, &cb); - unset_callback(); + cb.unset_callback(); if ( rc == 0 ) { @@ -90,14 +81,13 @@ int LogDB::setup_index(int& _last_applied, int& _last_index) oss.str(""); - set_callback(static_cast(&LogDB::setup_index_cb), - static_cast(&_last_applied)); + cb.set_callback(&_last_applied); oss << "SELECT MAX(log_index) FROM logdb WHERE timestamp != 0"; - rc += db->exec_rd(oss, this); + rc += db->exec_rd(oss, &cb); - unset_callback(); + cb.unset_callback(); if ( rc == 0 ) { @@ -114,39 +104,10 @@ int LogDB::setup_index(int& _last_applied, int& _last_index) /* -------------------------------------------------------------------------- */ /* -------------------------------------------------------------------------- */ -int LogDB::select_cb(void *req, int num, char **values, char **names) -{ - if ( !values[0] || !values[1] || !values[2] || !values[3] || !values[4] - || !values[5] || num != 6 ) - { - return -1; - } - - LogDBRecord ** request = static_cast(req); - - *request = new LogDBRecord; - - (*request)->index = static_cast(atoi(values[0])); - (*request)->term = static_cast(atoi(values[1])); - (*request)->sql = values[2]; - - (*request)->timestamp = static_cast(atoi(values[3])); - - (*request)->prev_index = static_cast(atoi(values[4])); - (*request)->prev_term = static_cast(atoi(values[5])); - - return 0; -} - -/* -------------------------------------------------------------------------- */ -/* -------------------------------------------------------------------------- */ - -LogDBRecord * LogDB::get_log_record(unsigned int index) +int LogDB::get_log_record(unsigned int index, LogDBRecord& lr) { ostringstream oss; - LogDBRecord * request = 0; - int prev_index = index - 1; if ( prev_index < 0 ) @@ -158,43 +119,73 @@ LogDBRecord * LogDB::get_log_record(unsigned int index) << " FROM logdb c, logdb p WHERE c.log_index = " << index << " AND p.log_index = " << prev_index; - set_callback(static_cast(&LogDB::select_cb), - static_cast(&request)); + lr.set_callback(); - db->exec_rd(oss, this); + int rc = db->exec_rd(oss, &lr); - unset_callback(); + lr.unset_callback(); - return request; + return rc; } /* -------------------------------------------------------------------------- */ /* -------------------------------------------------------------------------- */ -int LogDB::raft_state_cb(void *str_value, int num, char **values, char **names) +int LogDB::get_last_record_index(unsigned int& _i, unsigned int& _t) { - std::string * value = static_cast(str_value); - - if ( values[0] != 0 && num == 1 ) + class db_callback : public Callbackable { - *value = values[0]; - } + public: + int index; + int term; - return 0; + int cb(void *_db_cbk, int num, char **values, char **names) + { + if ( values == 0 || values[0] == 0 || values[1] == 0 || num != 2 ) + { + return -1; + } + + index = atoi(values[0]); + term = atoi(values[1]); + + return 0; + } + } db_cbk; + + std::ostringstream oss; + + db_cbk.set_callback(static_cast(&db_callback::cb)); + + oss << "SELECT log_index, term FROM logdb WHERE log_index=(SELECT " + << "MAX(log_index) FROM logdb)"; + + int rc = db->exec_rd(oss, &db_cbk); + + db_cbk.unset_callback(); + + _i = db_cbk.index; + _t = db_cbk.term; + + return rc; } +/* -------------------------------------------------------------------------- */ +/* -------------------------------------------------------------------------- */ + int LogDB::get_raft_state(std::string &raft_xml) { ostringstream oss; + single_cb cb; + oss << "SELECT sql FROM logdb WHERE log_index = -1 AND term = -1"; - set_callback(static_cast(&LogDB::raft_state_cb), - static_cast(&raft_xml)); + cb.set_callback(&raft_xml); - int rc = db->exec_rd(oss, this); + int rc = db->exec_rd(oss, &cb); - unset_callback(); + cb.unset_callback(); if ( raft_xml.empty() ) { @@ -219,16 +210,7 @@ int LogDB::insert_replace(int index, int term, const std::string& sql, return -1; } - if (replace) - { - oss << "REPLACE"; - } - else - { - oss << "INSERT"; - } - - oss << " INTO " << table << " ("<< db_names <<") VALUES (" + oss << "REPLACE INTO " << table << " ("<< db_names <<") VALUES (" << index << "," << term << "," << "'" << sql_db << "'," @@ -283,8 +265,6 @@ int LogDB::insert_log_record(unsigned int term, std::ostringstream& sql, next_index++; - last_term = term; - pthread_mutex_unlock(&mutex); return index; @@ -394,17 +374,15 @@ int LogDB::apply_log_records(unsigned int commit_index) while (last_applied < commit_index ) { - LogDBRecord * lr = get_log_record(last_applied + 1); + LogDBRecord lr; - if ( lr == 0 ) + if ( get_log_record(last_applied + 1, lr) != 0 ) { pthread_mutex_unlock(&mutex); return -1; } - rc = apply_log_record(lr); - - delete lr; + rc = apply_log_record(&lr); if ( rc != 0 ) {