mirror of
https://github.com/OpenNebula/one.git
synced 2025-01-11 05:17:41 +03:00
F #4809: Do not write log in solo mode, update instead of insert for
timestamps
This commit is contained in:
parent
f12fdfaead
commit
50880bb271
@ -125,10 +125,7 @@ public:
|
|||||||
* @param raft attributes in XML format
|
* @param raft attributes in XML format
|
||||||
* @return 0 on success
|
* @return 0 on success
|
||||||
*/
|
*/
|
||||||
int insert_raft_state(std::string& raft_xml)
|
int update_raft_state(std::string& raft_xml);
|
||||||
{
|
|
||||||
return insert_replace(-1, -1, raft_xml, 0);
|
|
||||||
}
|
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* Returns the raft state attributes as stored in the log
|
* Returns the raft state attributes as stored in the log
|
||||||
@ -273,7 +270,7 @@ private:
|
|||||||
*
|
*
|
||||||
* @return 0 on success
|
* @return 0 on success
|
||||||
*/
|
*/
|
||||||
int insert_replace(int index, int term, const std::string& sql, time_t ts);
|
int insert(int index, int term, const std::string& sql, time_t ts);
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* Inserts a new log record in the database. If the record is successfully
|
* Inserts a new log record in the database. If the record is successfully
|
||||||
|
@ -155,9 +155,9 @@ FEDERATION = [
|
|||||||
RAFT = [
|
RAFT = [
|
||||||
LOG_RETENTION = 500000,
|
LOG_RETENTION = 500000,
|
||||||
LOG_PURGE_TIMEOUT = 600,
|
LOG_PURGE_TIMEOUT = 600,
|
||||||
ELECTION_TIMEOUT_MS = 1500,
|
ELECTION_TIMEOUT_MS = 2500,
|
||||||
BROADCAST_TIMEOUT_MS = 500,
|
BROADCAST_TIMEOUT_MS = 500,
|
||||||
XMLRPC_TIMEOUT_MS = 100
|
XMLRPC_TIMEOUT_MS = 2000
|
||||||
]
|
]
|
||||||
|
|
||||||
# Executed when a server transits from follower->leader
|
# Executed when a server transits from follower->leader
|
||||||
|
@ -70,12 +70,18 @@ RaftManager::RaftManager(int id, const VectorAttribute * leader_hook_mad,
|
|||||||
|
|
||||||
if ( logdb->get_raft_state(raft_xml) != 0 )
|
if ( logdb->get_raft_state(raft_xml) != 0 )
|
||||||
{
|
{
|
||||||
|
std::ostringstream bsr;
|
||||||
|
|
||||||
|
bsr << "bootstrap state";
|
||||||
|
|
||||||
|
logdb->insert_log_record(-1, -1, bsr, 0);
|
||||||
|
|
||||||
raft_state.replace("TERM", 0);
|
raft_state.replace("TERM", 0);
|
||||||
raft_state.replace("VOTEDFOR", -1);
|
raft_state.replace("VOTEDFOR", -1);
|
||||||
|
|
||||||
raft_state.to_xml(raft_xml);
|
raft_state.to_xml(raft_xml);
|
||||||
|
|
||||||
logdb->insert_raft_state(raft_xml);
|
logdb->update_raft_state(raft_xml);
|
||||||
|
|
||||||
votedfor = -1;
|
votedfor = -1;
|
||||||
term = 0;
|
term = 0;
|
||||||
@ -411,7 +417,7 @@ void RaftManager::leader()
|
|||||||
frm->start_replica_threads();
|
frm->start_replica_threads();
|
||||||
}
|
}
|
||||||
|
|
||||||
logdb->insert_raft_state(raft_state_xml);
|
logdb->update_raft_state(raft_state_xml);
|
||||||
|
|
||||||
NebulaLog::log("RCM", Log::INFO, "oned is now the leader of zone");
|
NebulaLog::log("RCM", Log::INFO, "oned is now the leader of zone");
|
||||||
}
|
}
|
||||||
@ -480,7 +486,7 @@ void RaftManager::follower(unsigned int _term)
|
|||||||
frm->stop_replica_threads();
|
frm->stop_replica_threads();
|
||||||
}
|
}
|
||||||
|
|
||||||
logdb->insert_raft_state(raft_state_xml);
|
logdb->update_raft_state(raft_state_xml);
|
||||||
}
|
}
|
||||||
|
|
||||||
/* -------------------------------------------------------------------------- */
|
/* -------------------------------------------------------------------------- */
|
||||||
@ -672,7 +678,7 @@ int RaftManager::update_votedfor(int _votedfor)
|
|||||||
|
|
||||||
pthread_mutex_unlock(&mutex);
|
pthread_mutex_unlock(&mutex);
|
||||||
|
|
||||||
logdb->insert_raft_state(raft_state_xml);
|
logdb->update_raft_state(raft_state_xml);
|
||||||
|
|
||||||
return 0;
|
return 0;
|
||||||
}
|
}
|
||||||
@ -927,7 +933,7 @@ void RaftManager::request_vote()
|
|||||||
|
|
||||||
pthread_mutex_unlock(&mutex);
|
pthread_mutex_unlock(&mutex);
|
||||||
|
|
||||||
logdb->insert_raft_state(raft_state_xml);
|
logdb->update_raft_state(raft_state_xml);
|
||||||
|
|
||||||
logdb->get_last_record_index(lindex, lterm);
|
logdb->get_last_record_index(lindex, lterm);
|
||||||
|
|
||||||
@ -1008,7 +1014,7 @@ void RaftManager::request_vote()
|
|||||||
|
|
||||||
pthread_mutex_unlock(&mutex);
|
pthread_mutex_unlock(&mutex);
|
||||||
|
|
||||||
logdb->insert_raft_state(raft_state_xml);
|
logdb->update_raft_state(raft_state_xml);
|
||||||
|
|
||||||
srand(_server_id);
|
srand(_server_id);
|
||||||
|
|
||||||
|
@ -80,6 +80,18 @@ LogDB::LogDB(SqlDB * _db, bool _solo, unsigned int _lret):solo(_solo), db(_db),
|
|||||||
|
|
||||||
pthread_mutex_init(&mutex, 0);
|
pthread_mutex_init(&mutex, 0);
|
||||||
|
|
||||||
|
LogDBRecord lr;
|
||||||
|
|
||||||
|
|
||||||
|
if ( get_log_record(0, lr) != 0 || lr.sql.empty() )
|
||||||
|
{
|
||||||
|
std::ostringstream oss;
|
||||||
|
|
||||||
|
oss << time(0);
|
||||||
|
|
||||||
|
insert_log_record(0, 0, oss, time(0));
|
||||||
|
}
|
||||||
|
|
||||||
setup_index(r, i);
|
setup_index(r, i);
|
||||||
};
|
};
|
||||||
|
|
||||||
@ -194,42 +206,49 @@ int LogDB::get_raft_state(std::string &raft_xml)
|
|||||||
{
|
{
|
||||||
ostringstream oss;
|
ostringstream oss;
|
||||||
|
|
||||||
std::string zraft_xml;
|
|
||||||
|
|
||||||
single_cb<std::string> cb;
|
single_cb<std::string> cb;
|
||||||
|
|
||||||
oss << "SELECT sql FROM logdb WHERE log_index = -1 AND term = -1";
|
oss << "SELECT sql FROM logdb WHERE log_index = -1 AND term = -1";
|
||||||
|
|
||||||
cb.set_callback(&zraft_xml);
|
cb.set_callback(&raft_xml);
|
||||||
|
|
||||||
int rc = db->exec_rd(oss, &cb);
|
int rc = db->exec_rd(oss, &cb);
|
||||||
|
|
||||||
cb.unset_callback();
|
cb.unset_callback();
|
||||||
|
|
||||||
if ( zraft_xml.empty() )
|
if ( raft_xml.empty() )
|
||||||
{
|
{
|
||||||
rc = -1;
|
rc = -1;
|
||||||
}
|
}
|
||||||
|
|
||||||
std::string * _raft_xml = one_util::zlib_decompress(zraft_xml, true);
|
|
||||||
|
|
||||||
if ( _raft_xml == 0 )
|
|
||||||
{
|
|
||||||
return -1;
|
|
||||||
}
|
|
||||||
|
|
||||||
raft_xml = *_raft_xml;
|
|
||||||
|
|
||||||
delete _raft_xml;
|
|
||||||
|
|
||||||
return rc;
|
return rc;
|
||||||
}
|
}
|
||||||
|
|
||||||
/* -------------------------------------------------------------------------- */
|
/* -------------------------------------------------------------------------- */
|
||||||
/* -------------------------------------------------------------------------- */
|
/* -------------------------------------------------------------------------- */
|
||||||
|
|
||||||
int LogDB::insert_replace(int index, int term, const std::string& sql,
|
int LogDB::update_raft_state(std::string& raft_xml)
|
||||||
time_t timestamp)
|
{
|
||||||
|
std::ostringstream oss;
|
||||||
|
|
||||||
|
char * sql_db = db->escape_str(raft_xml.c_str());
|
||||||
|
|
||||||
|
if ( sql_db == 0 )
|
||||||
|
{
|
||||||
|
return -1;
|
||||||
|
}
|
||||||
|
|
||||||
|
oss << "UPDATE logdb SET sql ='" << sql_db << "' WHERE log_index = -1";
|
||||||
|
|
||||||
|
db->free_str(sql_db);
|
||||||
|
|
||||||
|
return db->exec_wr(oss);
|
||||||
|
}
|
||||||
|
|
||||||
|
/* -------------------------------------------------------------------------- */
|
||||||
|
/* -------------------------------------------------------------------------- */
|
||||||
|
|
||||||
|
int LogDB::insert(int index, int term, const std::string& sql, time_t tstamp)
|
||||||
{
|
{
|
||||||
std::ostringstream oss;
|
std::ostringstream oss;
|
||||||
|
|
||||||
@ -251,14 +270,27 @@ int LogDB::insert_replace(int index, int term, const std::string& sql,
|
|||||||
return -1;
|
return -1;
|
||||||
}
|
}
|
||||||
|
|
||||||
oss << "REPLACE INTO " << table << " ("<< db_names <<") VALUES ("
|
oss << "INSERT INTO " << table << " ("<< db_names <<") VALUES ("
|
||||||
<< index << ","
|
<< index << "," << term << "," << "'" << sql_db << "'," << tstamp<< ")";
|
||||||
<< term << ","
|
|
||||||
<< "'" << sql_db << "',"
|
|
||||||
<< timestamp << ")";
|
|
||||||
|
|
||||||
int rc = db->exec_wr(oss);
|
int rc = db->exec_wr(oss);
|
||||||
|
|
||||||
|
if ( rc != 0 )
|
||||||
|
{
|
||||||
|
//Check for duplicate (leader retrying i.e. xmlrpc client timeout)
|
||||||
|
LogDBRecord lr;
|
||||||
|
|
||||||
|
if ( get_log_record(index, lr) == 0 && !lr.sql.empty() )
|
||||||
|
{
|
||||||
|
NebulaLog::log("DBM", Log::ERROR, "Duplicated log record");
|
||||||
|
rc = 0;
|
||||||
|
}
|
||||||
|
else
|
||||||
|
{
|
||||||
|
rc = -1;
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
db->free_str(sql_db);
|
db->free_str(sql_db);
|
||||||
|
|
||||||
return rc;
|
return rc;
|
||||||
@ -277,7 +309,15 @@ int LogDB::apply_log_record(LogDBRecord * lr)
|
|||||||
|
|
||||||
if ( rc == 0 )
|
if ( rc == 0 )
|
||||||
{
|
{
|
||||||
insert_replace(lr->index, lr->term, lr->sql, time(0));
|
std::ostringstream oss;
|
||||||
|
|
||||||
|
oss << "UPDATE logdb SET timestamp = " << time(0) << " WHERE "
|
||||||
|
<< "log_index = " << lr->index << " AND timestamp = 0";
|
||||||
|
|
||||||
|
if ( db->exec_wr(oss) != 0 )
|
||||||
|
{
|
||||||
|
NebulaLog::log("DBM", Log::ERROR, "Cannot update log record");
|
||||||
|
}
|
||||||
|
|
||||||
last_applied = lr->index;
|
last_applied = lr->index;
|
||||||
}
|
}
|
||||||
@ -295,7 +335,7 @@ int LogDB::insert_log_record(unsigned int term, std::ostringstream& sql,
|
|||||||
|
|
||||||
unsigned int index = next_index;
|
unsigned int index = next_index;
|
||||||
|
|
||||||
if ( insert_replace(index, term, sql.str(), timestamp) != 0 )
|
if ( insert(index, term, sql.str(), timestamp) != 0 )
|
||||||
{
|
{
|
||||||
NebulaLog::log("DBM", Log::ERROR, "Cannot insert log record in DB");
|
NebulaLog::log("DBM", Log::ERROR, "Cannot insert log record in DB");
|
||||||
|
|
||||||
@ -325,7 +365,7 @@ int LogDB::insert_log_record(unsigned int index, unsigned int term,
|
|||||||
|
|
||||||
pthread_mutex_lock(&mutex);
|
pthread_mutex_lock(&mutex);
|
||||||
|
|
||||||
rc = insert_replace(index, term, sql.str(), timestamp);
|
rc = insert(index, term, sql.str(), timestamp);
|
||||||
|
|
||||||
if ( rc == 0 )
|
if ( rc == 0 )
|
||||||
{
|
{
|
||||||
@ -358,12 +398,6 @@ int LogDB::exec_wr(ostringstream& cmd)
|
|||||||
// -------------------------------------------------------------------------
|
// -------------------------------------------------------------------------
|
||||||
if ( solo )
|
if ( solo )
|
||||||
{
|
{
|
||||||
//TODO USE LAST_TERM IN SOlO MODE TO REENGAGE HA
|
|
||||||
if ( insert_log_record(0, cmd, time(0)) == -1 )
|
|
||||||
{
|
|
||||||
return -1;
|
|
||||||
}
|
|
||||||
|
|
||||||
return db->exec_wr(cmd);
|
return db->exec_wr(cmd);
|
||||||
}
|
}
|
||||||
else if ( raftm == 0 || !raftm->is_leader() )
|
else if ( raftm == 0 || !raftm->is_leader() )
|
||||||
@ -492,8 +526,6 @@ int LogDB::purge_log()
|
|||||||
|
|
||||||
unsigned int delete_index = last_index - log_retention;
|
unsigned int delete_index = last_index - log_retention;
|
||||||
|
|
||||||
oss.str("");
|
|
||||||
|
|
||||||
// keep the last "log_retention" records as well as those not applied to DB
|
// keep the last "log_retention" records as well as those not applied to DB
|
||||||
oss << "DELETE FROM logdb WHERE timestamp > 0 AND log_index >= 0 "
|
oss << "DELETE FROM logdb WHERE timestamp > 0 AND log_index >= 0 "
|
||||||
<< "AND log_index < " << delete_index;
|
<< "AND log_index < " << delete_index;
|
||||||
|
Loading…
Reference in New Issue
Block a user