mirror of
https://github.com/OpenNebula/one.git
synced 2025-01-21 18:03:38 +03:00
F #4809: Solves some bugs
This commit is contained in:
parent
7c479b8ecb
commit
900c37fdf5
@ -94,6 +94,8 @@ public:
|
|||||||
|
|
||||||
RaftManager(bool solo):term(0), commit(0), applied(0)
|
RaftManager(bool solo):term(0), commit(0), applied(0)
|
||||||
{
|
{
|
||||||
|
pthread_mutex_init(&mutex, 0);
|
||||||
|
|
||||||
if ( solo )
|
if ( solo )
|
||||||
{
|
{
|
||||||
state = SOLO;
|
state = SOLO;
|
||||||
@ -343,194 +345,21 @@ private:
|
|||||||
|
|
||||||
void replicate_log_action(const RaftAction& ra);
|
void replicate_log_action(const RaftAction& ra);
|
||||||
|
|
||||||
// ---------------------------------------------------------
|
// -------------------------------------------------------------------------
|
||||||
// Log entry replicated on follower
|
// Log entry replicated on follower
|
||||||
// - Increment next entry to send to follower
|
// - Increment next entry to send to follower
|
||||||
// - Update match entry on follower
|
// - Update match entry on follower
|
||||||
// - Evaluate majority to apply changes to DB
|
// - Evaluate majority to apply changes to DB
|
||||||
// ---------------------------------------------------------
|
// -------------------------------------------------------------------------
|
||||||
void replicate_success_action(const RaftAction& ra);
|
void replicate_success_action(const RaftAction& ra);
|
||||||
|
|
||||||
|
//--------------------------------------------------------------------------
|
||||||
|
// Log inconsistency in follower
|
||||||
|
// - Decrease follower index
|
||||||
|
// - Retry (do not wait for replica events)
|
||||||
|
//--------------------------------------------------------------------------
|
||||||
void replicate_failure_action(const RaftAction& ra);
|
void replicate_failure_action(const RaftAction& ra);
|
||||||
};
|
};
|
||||||
|
|
||||||
#endif /*RAFT_MANAGER_H_*/
|
#endif /*RAFT_MANAGER_H_*/
|
||||||
|
|
||||||
/*
|
|
||||||
|
|
||||||
zone = zpool->get(zone_id, true);
|
|
||||||
|
|
||||||
if ( zone == 0 )
|
|
||||||
{
|
|
||||||
continue;
|
|
||||||
}
|
|
||||||
|
|
||||||
ZoneServer * follower = zone->get_server(follower_id);
|
|
||||||
ZoneServer * leader = zone->get_server(leader_id);
|
|
||||||
|
|
||||||
if ( follower == 0 )
|
|
||||||
{
|
|
||||||
zone->unlock();
|
|
||||||
|
|
||||||
continue;
|
|
||||||
}
|
|
||||||
|
|
||||||
follower->inc_next();
|
|
||||||
|
|
||||||
follower->set_match(id);
|
|
||||||
|
|
||||||
if ( leader->get_applied() > follower->get_next() )
|
|
||||||
{
|
|
||||||
_pending_requests = true;
|
|
||||||
}
|
|
||||||
|
|
||||||
zone->unlock();
|
|
||||||
|
|
||||||
LogDBRequest * lr = logdb->get_request(id);
|
|
||||||
|
|
||||||
if ( lr == 0 )
|
|
||||||
{
|
|
||||||
oss.str("");
|
|
||||||
|
|
||||||
lr->lock();
|
|
||||||
|
|
||||||
oss << "Log entry " << id << "-" << term << "replicated"
|
|
||||||
<< " on server: " << follower_id << ". Total "
|
|
||||||
<< "replicas: " << lr->replicas() << " Replicas to "
|
|
||||||
<< "majority: " << lr->to_commit();
|
|
||||||
|
|
||||||
lr->replicated();
|
|
||||||
|
|
||||||
lr->unlock();
|
|
||||||
}
|
|
||||||
}
|
|
||||||
else
|
|
||||||
{
|
|
||||||
int follower_term = xmlrpc_c::value_boolean(values[1]);
|
|
||||||
|
|
||||||
if ( follower_term > term )
|
|
||||||
{
|
|
||||||
//------------------------------------------------------
|
|
||||||
// Convert to follower
|
|
||||||
// - Update term
|
|
||||||
// - Set state to follower
|
|
||||||
// - Stop replica threads
|
|
||||||
//------------------------------------------------------
|
|
||||||
ostringstream ess;
|
|
||||||
|
|
||||||
ess << "Detected a higher term on follower: "
|
|
||||||
<< follower_id << " giving up leadership";
|
|
||||||
|
|
||||||
NebulaLog::log("DBM", Log::WARNING, ess);
|
|
||||||
}
|
|
||||||
else
|
|
||||||
{
|
|
||||||
//------------------------------------------------------
|
|
||||||
// Log inconsistency in follower
|
|
||||||
// - Decrease follower index
|
|
||||||
// - Retry (do not wait for replica events)
|
|
||||||
//------------------------------------------------------
|
|
||||||
ostringstream ess;
|
|
||||||
|
|
||||||
ess << "Log inconsistency detected on follower: "
|
|
||||||
<< follower_id;
|
|
||||||
|
|
||||||
NebulaLog::log("DBM", Log::WARNING, ess);
|
|
||||||
|
|
||||||
zone = zpool->get(zone_id, true);
|
|
||||||
|
|
||||||
if ( zone == 0 )
|
|
||||||
{
|
|
||||||
continue;
|
|
||||||
}
|
|
||||||
|
|
||||||
ZoneServer * follower = zone->get_server(follower_id);
|
|
||||||
|
|
||||||
if ( follower == 0 )
|
|
||||||
{
|
|
||||||
zone->unlock();
|
|
||||||
|
|
||||||
continue;
|
|
||||||
}
|
|
||||||
|
|
||||||
follower->dec_next();
|
|
||||||
|
|
||||||
zone->unlock();
|
|
||||||
|
|
||||||
_pending_requests = true;
|
|
||||||
}
|
|
||||||
}
|
|
||||||
}
|
|
||||||
else //RPC failed, will retry on next replication request
|
|
||||||
{
|
|
||||||
ostringstream ess;
|
|
||||||
|
|
||||||
xmlrpc_c::fault failure = rpc_client.getFault();
|
|
||||||
|
|
||||||
ess << "Error replicating log entry " << id << "-" << term
|
|
||||||
<< " on follower " << follower_id << ": "
|
|
||||||
<< failure.getDescription();
|
|
||||||
|
|
||||||
NebulaLog::log("DBM", Log::ERROR, ess);
|
|
||||||
}
|
|
||||||
}
|
|
||||||
catch (exception const& e)
|
|
||||||
{
|
|
||||||
ostringstream ess;
|
|
||||||
|
|
||||||
ess << "Error replicating log entry " << id << "-" << term
|
|
||||||
<< " on follower " << follower_id << ": " << e.what();
|
|
||||||
|
|
||||||
NebulaLog::log("DBM", Log::ERROR, ess);
|
|
||||||
|
|
||||||
continue;
|
|
||||||
}
|
|
||||||
|
|
||||||
|
|
||||||
|
|
||||||
|
|
||||||
|
|
||||||
|
|
||||||
std::map<unsigned int, LogDBRequest *>::iterator it;
|
|
||||||
|
|
||||||
for ( it = requests.begin(); it != requests.end(); ++it )
|
|
||||||
{
|
|
||||||
delete it->second;
|
|
||||||
}
|
|
||||||
|
|
||||||
LogDBRecord * LogDB::get_logrecord(unsigned int index);
|
|
||||||
LogDBRequest * LogDB::get_request(unsigned int index)
|
|
||||||
{
|
|
||||||
std::map<unsigned int, LogDBRequest *>::iterator it;
|
|
||||||
|
|
||||||
LogDBRequest * req = 0;
|
|
||||||
|
|
||||||
pthread_mutex_lock(&mutex);
|
|
||||||
|
|
||||||
it = requests.find(index);
|
|
||||||
|
|
||||||
if ( it != requests.end() )
|
|
||||||
{
|
|
||||||
req = it->second;
|
|
||||||
}
|
|
||||||
|
|
||||||
pthread_mutex_unlock(&mutex);
|
|
||||||
|
|
||||||
if ( req == 0 )
|
|
||||||
{
|
|
||||||
LogDBRequest * req = select(index);
|
|
||||||
|
|
||||||
if ( req != 0 )
|
|
||||||
{
|
|
||||||
pthread_mutex_lock(&mutex);
|
|
||||||
|
|
||||||
requests.insert(std::make_pair(index, req));
|
|
||||||
|
|
||||||
pthread_mutex_unlock(&mutex);
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
return req;
|
|
||||||
}
|
|
||||||
|
|
||||||
*/
|
|
||||||
|
@ -888,7 +888,7 @@ void Nebula::start(bool bootstrap_only)
|
|||||||
// ---- Raft Manager ----
|
// ---- Raft Manager ----
|
||||||
try
|
try
|
||||||
{
|
{
|
||||||
raftm = new RaftManager(server_id != -1);
|
raftm = new RaftManager(server_id == -1);
|
||||||
}
|
}
|
||||||
catch (bad_alloc&)
|
catch (bad_alloc&)
|
||||||
{
|
{
|
||||||
@ -994,7 +994,6 @@ void Nebula::start(bool bootstrap_only)
|
|||||||
throw;
|
throw;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
||||||
// ---- Initialize Manager cross-reference pointers and pool references ----
|
// ---- Initialize Manager cross-reference pointers and pool references ----
|
||||||
|
|
||||||
dm->init_managers();
|
dm->init_managers();
|
||||||
@ -1023,12 +1022,11 @@ void Nebula::start(bool bootstrap_only)
|
|||||||
}
|
}
|
||||||
else
|
else
|
||||||
{
|
{
|
||||||
|
///////////////
|
||||||
|
/////////DEBUG
|
||||||
|
raftm->leader_trigger(0);
|
||||||
}
|
}
|
||||||
|
|
||||||
///////////////
|
|
||||||
/////////DEBUG
|
|
||||||
raftm->leader_trigger(0);
|
|
||||||
|
|
||||||
// -----------------------------------------------------------
|
// -----------------------------------------------------------
|
||||||
// Wait for a SIGTERM or SIGINT signal
|
// Wait for a SIGTERM or SIGINT signal
|
||||||
|
@ -111,6 +111,16 @@ void RaftManager::leader_action(const RaftAction& ra)
|
|||||||
|
|
||||||
std::ostringstream oss;
|
std::ostringstream oss;
|
||||||
|
|
||||||
|
pthread_mutex_lock(&mutex);
|
||||||
|
|
||||||
|
if ( state != FOLLOWER )
|
||||||
|
{
|
||||||
|
pthread_mutex_unlock(&mutex);
|
||||||
|
return;
|
||||||
|
}
|
||||||
|
|
||||||
|
pthread_mutex_unlock(&mutex);
|
||||||
|
|
||||||
NebulaLog::log("RCM", Log::INFO, "Becoming leader of zone");
|
NebulaLog::log("RCM", Log::INFO, "Becoming leader of zone");
|
||||||
|
|
||||||
//--------------------------------------------------------------------------
|
//--------------------------------------------------------------------------
|
||||||
@ -194,6 +204,12 @@ void RaftManager::replicate_log_action(const RaftAction& ra)
|
|||||||
{
|
{
|
||||||
pthread_mutex_lock(&mutex);
|
pthread_mutex_lock(&mutex);
|
||||||
|
|
||||||
|
if ( state != LEADER )
|
||||||
|
{
|
||||||
|
pthread_mutex_unlock(&mutex);
|
||||||
|
return;
|
||||||
|
}
|
||||||
|
|
||||||
ReplicaRequest * request = ra.request();
|
ReplicaRequest * request = ra.request();
|
||||||
|
|
||||||
requests.insert(std::make_pair(request->index(), request));
|
requests.insert(std::make_pair(request->index(), request));
|
||||||
|
@ -67,7 +67,7 @@ int LogDB::setup_index(int& _last_applied, int& _last_index)
|
|||||||
|
|
||||||
set_callback(static_cast<Callbackable::Callback>(&LogDB::setup_index_cb));
|
set_callback(static_cast<Callbackable::Callback>(&LogDB::setup_index_cb));
|
||||||
|
|
||||||
oss << "SELECT MAX(i.log_index) MAX(j.log_index) FROM logdb i, "
|
oss << "SELECT MAX(i.log_index), MAX(j.log_index) FROM logdb i, "
|
||||||
<< "(SELECT log_index AS log_index FROM logdb WHERE timestamp != 0) j";
|
<< "(SELECT log_index AS log_index FROM logdb WHERE timestamp != 0) j";
|
||||||
|
|
||||||
int rc = db->exec_rd(oss,this);
|
int rc = db->exec_rd(oss,this);
|
||||||
@ -244,12 +244,22 @@ int LogDB::exec_wr(ostringstream& cmd)
|
|||||||
|
|
||||||
RaftManager * raftm = Nebula::instance().get_raftm();
|
RaftManager * raftm = Nebula::instance().get_raftm();
|
||||||
|
|
||||||
unsigned int term = raftm->get_term();
|
unsigned int term = 0;
|
||||||
|
|
||||||
|
bool solo = true;
|
||||||
|
bool leader = true;
|
||||||
|
|
||||||
|
if ( raftm != 0 ) // == 0 during first bootstrap
|
||||||
|
{
|
||||||
|
term = raftm->get_term();
|
||||||
|
solo = raftm->is_solo();
|
||||||
|
leader = raftm->is_leader();
|
||||||
|
}
|
||||||
|
|
||||||
// -------------------------------------------------------------------------
|
// -------------------------------------------------------------------------
|
||||||
// OpenNebula was started in solo mode
|
// OpenNebula was started in solo mode
|
||||||
// -------------------------------------------------------------------------
|
// -------------------------------------------------------------------------
|
||||||
if ( raftm->is_solo() )
|
if ( solo )
|
||||||
{
|
{
|
||||||
if ( insert_log_record(term, cmd, time(0)) == -1 )
|
if ( insert_log_record(term, cmd, time(0)) == -1 )
|
||||||
{
|
{
|
||||||
@ -258,7 +268,7 @@ int LogDB::exec_wr(ostringstream& cmd)
|
|||||||
|
|
||||||
return db->exec_wr(cmd);
|
return db->exec_wr(cmd);
|
||||||
}
|
}
|
||||||
else if ( !raftm->is_leader() )
|
else if ( !leader )
|
||||||
{
|
{
|
||||||
NebulaLog::log("DBM", Log::ERROR,"Tried to modify DB being a follower");
|
NebulaLog::log("DBM", Log::ERROR,"Tried to modify DB being a follower");
|
||||||
return -1;
|
return -1;
|
||||||
|
Loading…
x
Reference in New Issue
Block a user