mirror of
https://github.com/OpenNebula/one.git
synced 2025-01-08 21:17:43 +03:00
Feature #2722: Minor changes and data type fixes
This commit is contained in:
parent
c64d63f3d5
commit
e6c785b954
@ -119,10 +119,10 @@ public:
|
||||
* @param term for the record
|
||||
* @param sql command of the record
|
||||
* @param timestamp associated to this record
|
||||
* @param fed_index index in the federation -1 if not federated
|
||||
* @param fed_index index in the federation UINT64_MAX if not federated
|
||||
* @param replace if true will replace the record if it exists
|
||||
*
|
||||
* @return -1 on failure, index of the inserted record on success
|
||||
* @return 0 on sucess, -1 on failure
|
||||
*/
|
||||
int insert_log_record(uint64_t index, unsigned int term,
|
||||
std::ostringstream& sql, time_t timestamp, uint64_t fed_index,
|
||||
@ -337,8 +337,8 @@ private:
|
||||
* Replicates writes in the followers and apply changes to DB state once
|
||||
* it is safe to do so.
|
||||
*
|
||||
* @param federated -1 not federated (fed_index = -1), 0 generate fed index
|
||||
* (fed_index = index), > 0 set (fed_index = federated)
|
||||
* @param federated UINT64_MAX not federated (fed_index = UINT64_MAX), 0
|
||||
* generate fed index (fed_index = index), > 0 set (fed_index = federated)
|
||||
*/
|
||||
int _exec_wr(ostringstream& cmd, uint64_t federated);
|
||||
|
||||
@ -360,8 +360,8 @@ private:
|
||||
*
|
||||
* @return 0 on success
|
||||
*/
|
||||
int insert(uint64_t index, int term, const std::string& sql, time_t ts,
|
||||
uint64_t fi, bool replace);
|
||||
int insert(uint64_t index, unsigned int term, const std::string& sql,
|
||||
time_t ts, uint64_t fi, bool replace);
|
||||
|
||||
/**
|
||||
* Inserts a new log record in the database. If the record is successfully
|
||||
@ -373,7 +373,7 @@ private:
|
||||
*
|
||||
* @return -1 on failure, index of the inserted record on success
|
||||
*/
|
||||
uint64_t insert_log_record(uint64_t term, std::ostringstream& sql,
|
||||
uint64_t insert_log_record(unsigned int term, std::ostringstream& sql,
|
||||
time_t timestamp, uint64_t fed_index);
|
||||
};
|
||||
|
||||
|
@ -222,12 +222,12 @@ public:
|
||||
/**
|
||||
* Get next index to send to the follower
|
||||
* @param follower server id
|
||||
* @return -1 on failure, the next index if success
|
||||
* @return UINT64_MAX on failure, the next index if success
|
||||
*/
|
||||
uint64_t get_next_index(int follower_id)
|
||||
{
|
||||
std::map<int, uint64_t>::iterator it;
|
||||
uint64_t _index = -1;
|
||||
uint64_t _index = UINT64_MAX;
|
||||
|
||||
pthread_mutex_lock(&mutex);
|
||||
|
||||
|
@ -313,14 +313,6 @@ protected:
|
||||
*/
|
||||
void failure_response(ErrorCode ec, RequestAttributes& ra);
|
||||
|
||||
/**
|
||||
* Builds an XML-RPC response updating retval. After calling this function
|
||||
* the xml-rpc excute method should return. A descriptive error message
|
||||
* is constructed for DB replications erros.
|
||||
* @param att the specific request attributes
|
||||
*/
|
||||
void failure_response_replication(RequestAttributes& att);
|
||||
|
||||
/**
|
||||
* Builds an error response. A descriptive error message
|
||||
* is constructed using att.resp_obj, att.resp_id and/or att.resp_msg and
|
||||
|
@ -442,7 +442,7 @@ int FedReplicaManager::xmlrpc_replicate_log(int zone_id, bool& success,
|
||||
else
|
||||
{
|
||||
error = xmlrpc_c::value_string(values[1]);
|
||||
last = xmlrpc_c::value_i8(values[3]);
|
||||
last = xmlrpc_c::value_i8(values[4]);
|
||||
}
|
||||
}
|
||||
else
|
||||
@ -459,3 +459,4 @@ int FedReplicaManager::xmlrpc_replicate_log(int zone_id, bool& success,
|
||||
|
||||
return xml_rc;
|
||||
}
|
||||
|
||||
|
@ -598,9 +598,7 @@ void RaftManager::replicate_log(ReplicaRequest * request)
|
||||
|
||||
for (it = next.begin(); it != next.end() ; ++it)
|
||||
{
|
||||
uint64_t rindex = request->index();
|
||||
|
||||
if ( rindex < (uint64_t) it->second )
|
||||
if ( request->index() < it->second )
|
||||
{
|
||||
to_commit--;
|
||||
}
|
||||
@ -1059,7 +1057,7 @@ int RaftManager::xmlrpc_replicate_log(int follower_id, LogDBRecord * lr,
|
||||
{
|
||||
int _server_id;
|
||||
uint64_t _commit;
|
||||
int _term;
|
||||
unsigned int _term;
|
||||
std::string xmlrpc_secret;
|
||||
|
||||
static const std::string replica_method = "one.zone.replicate";
|
||||
@ -1156,7 +1154,7 @@ int RaftManager::xmlrpc_request_vote(int follower_id, uint64_t lindex,
|
||||
std::string& error)
|
||||
{
|
||||
int _server_id;
|
||||
int _term;
|
||||
unsigned int _term;
|
||||
std::string xmlrpc_secret;
|
||||
|
||||
static const std::string replica_method = "one.zone.voterequest";
|
||||
|
@ -186,6 +186,18 @@ int RaftReplicaThread::replicate()
|
||||
|
||||
uint64_t next_index = raftm->get_next_index(follower_id);
|
||||
|
||||
if ( next_index == UINT64_MAX )
|
||||
{
|
||||
ostringstream ess;
|
||||
|
||||
ess << "Failed to get next replica index for follower: " << follower_id;
|
||||
|
||||
NebulaLog::log("RCM", Log::ERROR, ess);
|
||||
|
||||
return -1;
|
||||
}
|
||||
|
||||
|
||||
if ( logdb->get_log_record(next_index, lr) != 0 )
|
||||
{
|
||||
ostringstream ess;
|
||||
|
@ -738,20 +738,6 @@ void Request::failure_response(ErrorCode ec, const string& str_val,
|
||||
*(att.retval) = arrayresult;
|
||||
}
|
||||
|
||||
void Request::failure_response_replication(RequestAttributes& att)
|
||||
{
|
||||
vector<xmlrpc_c::value> arrayData;
|
||||
|
||||
arrayData.push_back(xmlrpc_c::value_boolean(false));
|
||||
arrayData.push_back(xmlrpc_c::value_string(att.resp_msg));
|
||||
arrayData.push_back(xmlrpc_c::value_int(REPLICATION));
|
||||
arrayData.push_back(xmlrpc_c::value_i8(att.replication_idx));
|
||||
|
||||
xmlrpc_c::value_array arrayresult(arrayData);
|
||||
|
||||
*(att.retval) = arrayresult;
|
||||
}
|
||||
|
||||
/* -------------------------------------------------------------------------- */
|
||||
|
||||
string Request::failure_message(ErrorCode ec, RequestAttributes& att)
|
||||
@ -828,7 +814,16 @@ string Request::failure_message(ErrorCode ec, RequestAttributes& att)
|
||||
}
|
||||
break;
|
||||
case REPLICATION:
|
||||
oss << "Error replicating log entry " << att.replication_idx << ".";
|
||||
oss << "Error replicating log entry " << att.replication_idx;
|
||||
|
||||
if (att.resp_msg.empty())
|
||||
{
|
||||
oss << ".";
|
||||
}
|
||||
else
|
||||
{
|
||||
oss << ": " << att.resp_msg << ".";
|
||||
}
|
||||
break;
|
||||
}
|
||||
|
||||
|
@ -600,7 +600,7 @@ void ZoneReplicateFedLog::request_execute(xmlrpc_c::paramList const& paramList,
|
||||
att.resp_msg = oss.str();
|
||||
att.replication_idx = UINT64_MAX;
|
||||
|
||||
failure_response_replication(att);
|
||||
failure_response(REPLICATION, att);
|
||||
return;
|
||||
}
|
||||
|
||||
@ -613,7 +613,7 @@ void ZoneReplicateFedLog::request_execute(xmlrpc_c::paramList const& paramList,
|
||||
att.resp_msg = oss.str();
|
||||
att.replication_idx = UINT64_MAX;
|
||||
|
||||
failure_response_replication(att);
|
||||
failure_response(REPLICATION, att);
|
||||
return;
|
||||
}
|
||||
|
||||
@ -632,7 +632,7 @@ void ZoneReplicateFedLog::request_execute(xmlrpc_c::paramList const& paramList,
|
||||
att.resp_msg = oss.str();
|
||||
att.replication_idx = index;
|
||||
|
||||
failure_response_replication(att);
|
||||
failure_response(REPLICATION, att);
|
||||
}
|
||||
else // rc == last_index in log
|
||||
{
|
||||
@ -643,7 +643,7 @@ void ZoneReplicateFedLog::request_execute(xmlrpc_c::paramList const& paramList,
|
||||
att.resp_msg = oss.str();
|
||||
att.replication_idx = rc;
|
||||
|
||||
failure_response_replication(att);
|
||||
failure_response(REPLICATION, att);
|
||||
}
|
||||
|
||||
return;
|
||||
|
@ -74,10 +74,15 @@ int LogDBRecord::select_cb(void *nil, int num, char **values, char **names)
|
||||
iss >> index;
|
||||
iss.clear();
|
||||
|
||||
term = static_cast<unsigned int>(atoi(values[1]));
|
||||
zsql = values[2];
|
||||
iss.str(string(values[1]));
|
||||
iss >> term;
|
||||
iss.clear();
|
||||
|
||||
timestamp = static_cast<unsigned int>(atoi(values[3]));
|
||||
zsql = values[2];
|
||||
|
||||
iss.str(string(values[3]));
|
||||
iss >> timestamp;
|
||||
iss.clear();
|
||||
|
||||
iss.str(string(values[4]));
|
||||
iss >> fed_index;
|
||||
@ -115,8 +120,8 @@ int LogDBRecord::select_cb(void *nil, int num, char **values, char **names)
|
||||
/* -------------------------------------------------------------------------- */
|
||||
|
||||
LogDB::LogDB(SqlDB * _db, bool _solo, bool _cache, uint64_t _lret, uint64_t _lp):
|
||||
solo(_solo), cache(_cache), db(_db), next_index(0), last_applied(-1), last_index(-1),
|
||||
last_term(-1), log_retention(_lret), limit_purge(_lp)
|
||||
solo(_solo), cache(_cache), db(_db), next_index(0), last_applied(-1),
|
||||
last_index(-1), last_term(-1), log_retention(_lret), limit_purge(_lp)
|
||||
{
|
||||
uint64_t r, i;
|
||||
|
||||
@ -130,7 +135,7 @@ LogDB::LogDB(SqlDB * _db, bool _solo, bool _cache, uint64_t _lret, uint64_t _lp)
|
||||
|
||||
oss << time(0);
|
||||
|
||||
insert_log_record(0, 0, oss, time(0), -1, false);
|
||||
insert_log_record(0, 0, oss, time(0), UINT64_MAX, false);
|
||||
}
|
||||
|
||||
setup_index(r, i);
|
||||
@ -287,7 +292,8 @@ int LogDB::update_raft_state(std::string name, std::string& raft_xml)
|
||||
return -1;
|
||||
}
|
||||
|
||||
oss << "UPDATE system_attributes SET body ='" << sql_db << "' WHERE name = '" << name << "'";
|
||||
oss << "UPDATE system_attributes SET body ='" << sql_db
|
||||
<< "' WHERE name = '" << name << "'";
|
||||
|
||||
db->free_str(sql_db);
|
||||
|
||||
@ -297,8 +303,8 @@ int LogDB::update_raft_state(std::string name, std::string& raft_xml)
|
||||
/* -------------------------------------------------------------------------- */
|
||||
/* -------------------------------------------------------------------------- */
|
||||
|
||||
int LogDB::insert(uint64_t index, int term, const std::string& sql, time_t tstamp,
|
||||
uint64_t fed_index, bool replace)
|
||||
int LogDB::insert(uint64_t index, unsigned int term, const std::string& sql,
|
||||
time_t tstamp, uint64_t fed_index, bool replace)
|
||||
{
|
||||
std::ostringstream oss;
|
||||
|
||||
@ -371,8 +377,8 @@ int LogDB::apply_log_record(LogDBRecord * lr)
|
||||
{
|
||||
std::ostringstream oss;
|
||||
|
||||
oss << "UPDATE logdb SET timestamp = " << time(0) << ", applied = 1" << " WHERE "
|
||||
<< "log_index = " << lr->index << " AND timestamp = 0";
|
||||
oss << "UPDATE logdb SET timestamp = " << time(0) << ", applied = 1"
|
||||
<< " WHERE log_index = " << lr->index << " AND timestamp = 0";
|
||||
|
||||
if ( db->exec_wr(oss) != 0 )
|
||||
{
|
||||
@ -394,7 +400,7 @@ int LogDB::apply_log_record(LogDBRecord * lr)
|
||||
/* -------------------------------------------------------------------------- */
|
||||
/* -------------------------------------------------------------------------- */
|
||||
|
||||
uint64_t LogDB::insert_log_record(uint64_t term, std::ostringstream& sql,
|
||||
uint64_t LogDB::insert_log_record(unsigned int term, std::ostringstream& sql,
|
||||
time_t timestamp, uint64_t fed_index)
|
||||
{
|
||||
pthread_mutex_lock(&mutex);
|
||||
@ -418,7 +424,7 @@ uint64_t LogDB::insert_log_record(uint64_t term, std::ostringstream& sql,
|
||||
|
||||
pthread_mutex_unlock(&mutex);
|
||||
|
||||
return -1;
|
||||
return UINT64_MAX;
|
||||
}
|
||||
|
||||
//allocate a replication request if log record is going to be replicated
|
||||
@ -433,7 +439,7 @@ uint64_t LogDB::insert_log_record(uint64_t term, std::ostringstream& sql,
|
||||
|
||||
next_index++;
|
||||
|
||||
if ( fed_index != UINT64_MAX )
|
||||
if ( _fed_index != UINT64_MAX )
|
||||
{
|
||||
fed_log.insert(_fed_index);
|
||||
}
|
||||
|
Loading…
Reference in New Issue
Block a user