1
0
mirror of https://github.com/OpenNebula/one.git synced 2025-02-23 21:57:43 +03:00

Better trigger of replication requests. Synchronize DB writers and Raft

main thread.

On followers, replace exisiting records when matching index and term to
cope with API reset because of timeouts.

(cherry picked from commit 7719e7c17be7a316b0b6e2e2741f461553e32753)
(cherry picked from commit 549087250796e015a3b98a9b6c44b86b10da4fab)
(cherry picked from commit 2f281b51f38956a59c10ee174e70b14f97e145a5)
(cherry picked from commit ccdbb99e9dcf439db8a0c724b72cda7a55f01312)
This commit is contained in:
Ruben S. Montero 2018-07-30 13:14:14 +02:00
parent 066849c20f
commit 914368d300
6 changed files with 207 additions and 40 deletions

View File

@ -120,11 +120,13 @@ public:
* @param sql command of the record
* @param timestamp associated to this record
* @param fed_index index in the federation -1 if not federated
* @param replace if true will replace the record if it exists
*
* @return -1 on failure, index of the inserted record on success
*/
int insert_log_record(unsigned int index, unsigned int term,
std::ostringstream& sql, time_t timestamp, int fed_index);
std::ostringstream& sql, time_t timestamp, int fed_index,
bool replace);
/**
* Replicate a log record on followers. It will also replicate any missing
@ -350,10 +352,12 @@ private:
* @param sql command to modify DB state
* @param ts timestamp of record application to DB state
* @param fi the federated index -1 if none
* @param replace if true will replace the record if it exists
*
* @return 0 on success
*/
int insert(int index, int term, const std::string& sql, time_t ts, int fi);
int insert(int index, int term, const std::string& sql, time_t ts, int fi,
bool replace);
/**
* Inserts a new log record in the database. If the record is successfully

View File

@ -62,6 +62,8 @@ public:
{
delete leader_hook;
delete follower_hook;
pthread_mutex_destroy(&mutex);
};
// -------------------------------------------------------------------------
@ -89,6 +91,15 @@ public:
*/
void replicate_log(ReplicaRequest * rr);
/**
* Allocate a replica request fot the given index.
* @param rindex of the record for the request
*/
void replicate_allocate(int rindex)
{
requests.allocate(rindex);
}
/**
* Finalizes the Raft Consensus Manager
*/
@ -312,8 +323,7 @@ private:
/**
* Clients waiting for a log replication
*/
std::map<int, ReplicaRequest *> requests;
ReplicaRequestMap requests;
// -------------------------------------------------------------------------
// Raft state

View File

@ -37,7 +37,7 @@ public:
* the client is notified
* @return number of replicas for this log
*/
int inc_replicas()
int add_replica()
{
int __replicas;
@ -102,5 +102,163 @@ private:
int _replicas;
};
/**
* This class represents a map of replication requests. It syncs access between
* RaftManager and DB writer threads. A DB writer allocates and set the
* request and then it waits on it for completion.
*/
class ReplicaRequestMap
{
public:
ReplicaRequestMap()
{
pthread_mutex_init(&mutex, 0);
};
virtual ~ReplicaRequestMap()
{
pthread_mutex_destroy(&mutex);
}
/**
* Increments the number of replicas of this request. If it can be
* committed the request is removed from the map
* @param rindex of the request
*
* @return the number of replicas to commit, if 0 it can be committed
*/
int add_replica(int rindex)
{
int to_commit = -1;
pthread_mutex_lock(&mutex);
std::map<int, ReplicaRequest *>::iterator it = requests.find(rindex);
if ( it != requests.end() && it->second != 0 )
{
it->second->add_replica();
to_commit = it->second->to_commit();
if ( to_commit == 0 )
{
requests.erase(it);
}
}
pthread_mutex_unlock(&mutex);
return to_commit;
}
/**
* Allocated an empty replica request. It marks a writer thread will wait
* on this request.
* @param rindex of the request
*/
void allocate(int rindex)
{
pthread_mutex_lock(&mutex);
std::map<int, ReplicaRequest *>::iterator it = requests.find(rindex);
if ( it == requests.end() )
{
requests.insert(std::make_pair(rindex, (ReplicaRequest*) 0));
}
pthread_mutex_unlock(&mutex);
}
/**
* Set the replication request associated to this index. If there is no
* previous request associated to the index it is created.
* @param rindex of the request
* @param rr replica request pointer
*/
void set(int rindex, ReplicaRequest * rr)
{
pthread_mutex_lock(&mutex);
std::map<int, ReplicaRequest *>::iterator it = requests.find(rindex);
if ( it == requests.end() )
{
requests.insert(std::make_pair(rindex, rr));
}
else if ( it->second == 0 )
{
it->second = rr;
}
pthread_mutex_unlock(&mutex);
}
/**
* Notify all writers and clear the replica map
*/
void clear()
{
pthread_mutex_lock(&mutex);
std::map<int, ReplicaRequest *>::iterator it;
for ( it = requests.begin() ; it != requests.end() ; ++it )
{
if ( it->second == 0 )
{
continue;
}
it->second->result = false;
it->second->timeout = false;
it->second->message = "oned is now follower";
it->second->notify();
}
requests.clear();
pthread_mutex_unlock(&mutex);
}
/**
* @return true if a replica request needs to be replicated
* - last DB index is greater than current replicated index
* - there is no allocation in progress for the next rindex
*/
bool need_replica(int db_index, int rindex)
{
if ( db_index <= rindex )
{
return false;
}
pthread_mutex_lock(&mutex);
std::map<int,ReplicaRequest *>::iterator it = requests.find(rindex + 1);
bool rc = true;
if ( it != requests.end() && it->second == 0 )
{
rc = false;
}
pthread_mutex_unlock(&mutex);
return rc;
}
private:
pthread_mutex_t mutex;
/**
* Clients waiting for a log replication
*/
std::map<int, ReplicaRequest *> requests;
};
#endif /*REPLICA_REQUEST_H_*/

View File

@ -74,7 +74,7 @@ RaftManager::RaftManager(int id, const VectorAttribute * leader_hook_mad,
bsr << "bootstrap state";
logdb->insert_log_record(-1, -1, bsr, 0, -1);
logdb->insert_log_record(-1, -1, bsr, 0, -1, false);
raft_state.replace("TERM", 0);
raft_state.replace("VOTEDFOR", -1);
@ -547,17 +547,6 @@ void RaftManager::follower(unsigned int _term)
NebulaLog::log("RCM", Log::INFO, "oned is set to follower mode");
std::map<int, ReplicaRequest *>::iterator it;
for ( it = requests.begin() ; it != requests.end() ; ++it )
{
it->second->result = false;
it->second->timeout= false;
it->second->message= "oned is now follower";
it->second->notify();
}
next.clear();
match.clear();
@ -610,7 +599,7 @@ void RaftManager::replicate_log(ReplicaRequest * request)
{
request->to_commit(num_servers / 2 );
requests.insert(std::make_pair(request->index(), request));
requests.set(request->index(), request);
}
if ( num_servers > 1 )
@ -634,9 +623,9 @@ void RaftManager::replicate_success(int follower_id)
Nebula& nd = Nebula::instance();
LogDB * logdb = nd.get_logdb();
unsigned int db_last_index, db_last_term;
unsigned int db_lindex, db_lterm;
logdb->get_last_record_index(db_last_index, db_last_term);
logdb->get_last_record_index(db_lindex, db_lterm);
pthread_mutex_lock(&mutex);
@ -654,21 +643,12 @@ void RaftManager::replicate_success(int follower_id)
match_it->second = replicated_index;
next_it->second = replicated_index + 1;
it = requests.find(replicated_index);
if ( it != requests.end() )
if ( requests.add_replica(replicated_index) == 0 )
{
it->second->inc_replicas();
if ( it->second->to_commit() == 0 )
{
requests.erase(it);
commit = replicated_index;
}
commit = replicated_index;
}
if ((db_last_index > replicated_index) && (state == LEADER))
if ( requests.need_replica(db_lindex, replicated_index) && state == LEADER )
{
replica_manager.replicate(follower_id);
}

View File

@ -425,7 +425,7 @@ void ZoneReplicateLog::request_execute(xmlrpc_c::paramList const& paramList,
ostringstream sql_oss(sql);
if ( logdb->insert_log_record(index, term, sql_oss, 0, fed_index) != 0 )
if (logdb->insert_log_record(index, term, sql_oss, 0, fed_index, true) != 0)
{
att.resp_msg = "Error writing log record";
att.resp_id = current_term;

View File

@ -120,7 +120,7 @@ LogDB::LogDB(SqlDB * _db, bool _solo, unsigned int _lret, unsigned int _lp):
oss << time(0);
insert_log_record(0, 0, oss, time(0), -1);
insert_log_record(0, 0, oss, time(0), -1, false);
}
setup_index(r, i);
@ -288,7 +288,7 @@ int LogDB::update_raft_state(std::string& raft_xml)
/* -------------------------------------------------------------------------- */
int LogDB::insert(int index, int term, const std::string& sql, time_t tstamp,
int fed_index)
int fed_index, bool replace)
{
std::ostringstream oss;
@ -310,7 +310,16 @@ int LogDB::insert(int index, int term, const std::string& sql, time_t tstamp,
return -1;
}
oss << "INSERT INTO " << table << " ("<< db_names <<") VALUES ("
if (replace)
{
oss << "REPLACE";
}
else
{
oss << "INSERT";
}
oss << " INTO " << table << " ("<< db_names <<") VALUES ("
<< index << "," << term << "," << "'" << sql_db << "'," << tstamp
<< "," << fed_index << ")";
@ -387,7 +396,7 @@ int LogDB::insert_log_record(unsigned int term, std::ostringstream& sql,
_fed_index = fed_index;
}
if ( insert(index, term, sql.str(), timestamp, _fed_index) != 0 )
if ( insert(index, term, sql.str(), timestamp, _fed_index, false) != 0 )
{
NebulaLog::log("DBM", Log::ERROR, "Cannot insert log record in DB");
@ -396,6 +405,12 @@ int LogDB::insert_log_record(unsigned int term, std::ostringstream& sql,
return -1;
}
//allocate a replication request if log record is going to be replicated
if ( timestamp == 0 )
{
Nebula::instance().get_raftm()->replicate_allocate(next_index);
}
last_index = next_index;
last_term = term;
@ -416,13 +431,13 @@ int LogDB::insert_log_record(unsigned int term, std::ostringstream& sql,
/* -------------------------------------------------------------------------- */
int LogDB::insert_log_record(unsigned int index, unsigned int term,
std::ostringstream& sql, time_t timestamp, int fed_index)
std::ostringstream& sql, time_t timestamp, int fed_index, bool replace)
{
int rc;
pthread_mutex_lock(&mutex);
rc = insert(index, term, sql.str(), timestamp, fed_index);
rc = insert(index, term, sql.str(), timestamp, fed_index, replace);
if ( rc == 0 )
{