mirror of
https://github.com/OpenNebula/one.git
synced 2025-03-30 22:50:10 +03:00
Raft stability patch:
* Do not reset vote on followers upon successful election * Cache one_auth secret to fix mt issue and improve heartbeat performance * Remove debug message when a record is not loaded * Remove default timeout for curl library
This commit is contained in:
parent
a3b21a0a86
commit
a6addb314e
@ -156,6 +156,11 @@ private:
|
||||
*/
|
||||
pthread_mutex_t mutex;
|
||||
|
||||
/**
|
||||
* Secret to use in xmlrpc API calls
|
||||
*/
|
||||
std::string xmlrpc_secret;
|
||||
|
||||
// -------------------------------------------------------------------------
|
||||
// Synchronization variables
|
||||
// - xmlrpc_timeout. To timeout xml-rpc api calls to replicate log
|
||||
|
@ -292,6 +292,11 @@ private:
|
||||
*/
|
||||
std::map<int, ReplicaRequest *> requests;
|
||||
|
||||
/**
|
||||
* Secret to use in xmlrpc API calls
|
||||
*/
|
||||
std::string xmlrpc_secret;
|
||||
|
||||
// -------------------------------------------------------------------------
|
||||
// Raft state
|
||||
// -------------------------------------------------------------------------
|
||||
|
@ -157,7 +157,7 @@ RAFT = [
|
||||
LOG_PURGE_TIMEOUT = 600,
|
||||
ELECTION_TIMEOUT_MS = 2500,
|
||||
BROADCAST_TIMEOUT_MS = 500,
|
||||
XMLRPC_TIMEOUT_MS = 2000
|
||||
XMLRPC_TIMEOUT_MS = 0
|
||||
]
|
||||
|
||||
# Executed when a server transits from follower->leader
|
||||
|
@ -77,7 +77,7 @@ int Client::read_oneauth(string &secret, string& error_msg)
|
||||
string one_auth_file;
|
||||
ifstream file;
|
||||
|
||||
const char * one_auth_env = getenv("ONE_AUTH");
|
||||
const char * one_auth_env = getenv("ONE_AUTH");
|
||||
|
||||
if (!one_auth_env) //No $ONE_AUTH, read $HOME/.one/one_auth
|
||||
{
|
||||
|
@ -29,9 +29,16 @@ const time_t FedReplicaManager::xmlrpc_timeout_ms = 10000;
|
||||
|
||||
FedReplicaManager::FedReplicaManager(LogDB * d): ReplicaManager(), logdb(d)
|
||||
{
|
||||
std::string error;
|
||||
|
||||
pthread_mutex_init(&mutex, 0);
|
||||
|
||||
am.addListener(this);
|
||||
|
||||
if ( Client::read_oneauth(xmlrpc_secret, error) == -1 )
|
||||
{
|
||||
throw runtime_error(error);
|
||||
}
|
||||
};
|
||||
|
||||
/* -------------------------------------------------------------------------- */
|
||||
@ -391,7 +398,7 @@ int FedReplicaManager::xmlrpc_replicate_log(int zone_id, bool& success,
|
||||
{
|
||||
static const std::string replica_method = "one.zone.fedreplicate";
|
||||
|
||||
std::string secret, zedp;
|
||||
std::string zedp;
|
||||
|
||||
int xml_rc = 0;
|
||||
|
||||
@ -407,15 +414,10 @@ int FedReplicaManager::xmlrpc_replicate_log(int zone_id, bool& success,
|
||||
// -------------------------------------------------------------------------
|
||||
// Get parameters to call append entries on follower
|
||||
// -------------------------------------------------------------------------
|
||||
if ( Client::read_oneauth(secret, error) == -1 )
|
||||
{
|
||||
return -1;
|
||||
}
|
||||
|
||||
xmlrpc_c::value result;
|
||||
xmlrpc_c::paramList replica_params;
|
||||
|
||||
replica_params.add(xmlrpc_c::value_string(secret));
|
||||
replica_params.add(xmlrpc_c::value_string(xmlrpc_secret));
|
||||
replica_params.add(xmlrpc_c::value_int(lr.index));
|
||||
replica_params.add(xmlrpc_c::value_int(prev_index));
|
||||
replica_params.add(xmlrpc_c::value_string(lr.sql));
|
||||
|
@ -54,12 +54,17 @@ RaftManager::RaftManager(int id, const VectorAttribute * leader_hook_mad,
|
||||
Nebula& nd = Nebula::instance();
|
||||
LogDB * logdb = nd.get_logdb();
|
||||
|
||||
std::string raft_xml, cmd, arg;
|
||||
std::string raft_xml, cmd, arg, error;
|
||||
|
||||
pthread_mutex_init(&mutex, 0);
|
||||
|
||||
am.addListener(this);
|
||||
|
||||
if ( Client::read_oneauth(xmlrpc_secret, error) == -1 )
|
||||
{
|
||||
throw runtime_error(error);
|
||||
}
|
||||
|
||||
// -------------------------------------------------------------------------
|
||||
// Initialize Raft variables:
|
||||
// - state
|
||||
@ -481,18 +486,20 @@ void RaftManager::follower(unsigned int _term)
|
||||
|
||||
state = FOLLOWER;
|
||||
|
||||
term = _term;
|
||||
votedfor = -1;
|
||||
if ( _term > term )
|
||||
{
|
||||
term = _term;
|
||||
votedfor = -1;
|
||||
|
||||
commit = lapplied;
|
||||
raft_state.replace("VOTEDFOR", votedfor);
|
||||
raft_state.replace("TERM", term);
|
||||
|
||||
raft_state.to_xml(raft_state_xml);
|
||||
}
|
||||
|
||||
commit = lapplied;
|
||||
leader_id = -1;
|
||||
|
||||
raft_state.replace("VOTEDFOR", votedfor);
|
||||
raft_state.replace("TERM", term);
|
||||
|
||||
raft_state.to_xml(raft_state_xml);
|
||||
|
||||
NebulaLog::log("RCM", Log::INFO, "oned is set to follower mode");
|
||||
|
||||
std::map<int, ReplicaRequest *>::iterator it;
|
||||
@ -518,7 +525,10 @@ void RaftManager::follower(unsigned int _term)
|
||||
frm->stop_replica_threads();
|
||||
}
|
||||
|
||||
logdb->update_raft_state(raft_state_xml);
|
||||
if (!raft_state_xml.empty())
|
||||
{
|
||||
logdb->update_raft_state(raft_state_xml);
|
||||
}
|
||||
}
|
||||
|
||||
/* -------------------------------------------------------------------------- */
|
||||
@ -982,7 +992,6 @@ int RaftManager::xmlrpc_replicate_log(int follower_id, LogDBRecord * lr,
|
||||
|
||||
static const std::string replica_method = "one.zone.replicate";
|
||||
|
||||
std::string secret;
|
||||
std::string follower_edp;
|
||||
|
||||
std::map<int, std::string>::iterator it;
|
||||
@ -1012,16 +1021,10 @@ int RaftManager::xmlrpc_replicate_log(int follower_id, LogDBRecord * lr,
|
||||
// -------------------------------------------------------------------------
|
||||
// Get parameters to call append entries on follower
|
||||
// -------------------------------------------------------------------------
|
||||
if ( Client::read_oneauth(secret, error) == -1 )
|
||||
{
|
||||
NebulaLog::log("RRM", Log::ERROR, error);
|
||||
return -1;
|
||||
}
|
||||
|
||||
xmlrpc_c::value result;
|
||||
xmlrpc_c::paramList replica_params;
|
||||
|
||||
replica_params.add(xmlrpc_c::value_string(secret));
|
||||
replica_params.add(xmlrpc_c::value_string(xmlrpc_secret));
|
||||
replica_params.add(xmlrpc_c::value_int(_server_id));
|
||||
replica_params.add(xmlrpc_c::value_int(_commit));
|
||||
replica_params.add(xmlrpc_c::value_int(_term));
|
||||
@ -1080,7 +1083,6 @@ int RaftManager::xmlrpc_request_vote(int follower_id, unsigned int lindex,
|
||||
|
||||
static const std::string replica_method = "one.zone.voterequest";
|
||||
|
||||
std::string secret;
|
||||
std::string follower_edp;
|
||||
|
||||
std::map<int, std::string>::iterator it;
|
||||
@ -1109,16 +1111,10 @@ int RaftManager::xmlrpc_request_vote(int follower_id, unsigned int lindex,
|
||||
// -------------------------------------------------------------------------
|
||||
// Get parameters to call append entries on follower
|
||||
// -------------------------------------------------------------------------
|
||||
if ( Client::read_oneauth(secret, error) == -1 )
|
||||
{
|
||||
NebulaLog::log("RRM", Log::ERROR, error);
|
||||
return -1;
|
||||
}
|
||||
|
||||
xmlrpc_c::value result;
|
||||
xmlrpc_c::paramList replica_params;
|
||||
|
||||
replica_params.add(xmlrpc_c::value_string(secret));
|
||||
replica_params.add(xmlrpc_c::value_string(xmlrpc_secret));
|
||||
replica_params.add(xmlrpc_c::value_int(_term));
|
||||
replica_params.add(xmlrpc_c::value_int(_server_id));
|
||||
replica_params.add(xmlrpc_c::value_int(lindex));
|
||||
|
@ -199,14 +199,6 @@ int LogDB::get_log_record(unsigned int index, LogDBRecord& lr)
|
||||
|
||||
if ( lr.index != index )
|
||||
{
|
||||
std::ostringstream oss;
|
||||
|
||||
oss << "Log record " << index << " loaded incorrectly. Record index: "
|
||||
<< lr.index << " fed. index: " << lr.fed_index << " sql command: "
|
||||
<< lr.sql << ". Operation return code: " << rc;
|
||||
|
||||
NebulaLog::log("DBM", Log::ERROR, oss);
|
||||
|
||||
rc = -1;
|
||||
}
|
||||
|
||||
|
Loading…
x
Reference in New Issue
Block a user