diff --git a/include/LogDB.h b/include/LogDB.h index 92e4dcae14..8d8b51d75d 100644 --- a/include/LogDB.h +++ b/include/LogDB.h @@ -125,10 +125,7 @@ public: * @param raft attributes in XML format * @return 0 on success */ - int insert_raft_state(std::string& raft_xml) - { - return insert_replace(-1, -1, raft_xml, 0); - } + int update_raft_state(std::string& raft_xml); /** * Returns the raft state attributes as stored in the log @@ -273,7 +270,7 @@ private: * * @return 0 on success */ - int insert_replace(int index, int term, const std::string& sql, time_t ts); + int insert(int index, int term, const std::string& sql, time_t ts); /** * Inserts a new log record in the database. If the record is successfully diff --git a/share/etc/oned.conf b/share/etc/oned.conf index b160771dd7..4dff0d953c 100644 --- a/share/etc/oned.conf +++ b/share/etc/oned.conf @@ -155,9 +155,9 @@ FEDERATION = [ RAFT = [ LOG_RETENTION = 500000, LOG_PURGE_TIMEOUT = 600, - ELECTION_TIMEOUT_MS = 1500, + ELECTION_TIMEOUT_MS = 2500, BROADCAST_TIMEOUT_MS = 500, - XMLRPC_TIMEOUT_MS = 100 + XMLRPC_TIMEOUT_MS = 2000 ] # Executed when a server transits from follower->leader diff --git a/src/raft/RaftManager.cc b/src/raft/RaftManager.cc index 3b8f372ceb..e91b7312fd 100644 --- a/src/raft/RaftManager.cc +++ b/src/raft/RaftManager.cc @@ -70,12 +70,18 @@ RaftManager::RaftManager(int id, const VectorAttribute * leader_hook_mad, if ( logdb->get_raft_state(raft_xml) != 0 ) { + std::ostringstream bsr; + + bsr << "bootstrap state"; + + logdb->insert_log_record(-1, -1, bsr, 0); + raft_state.replace("TERM", 0); raft_state.replace("VOTEDFOR", -1); raft_state.to_xml(raft_xml); - logdb->insert_raft_state(raft_xml); + logdb->update_raft_state(raft_xml); votedfor = -1; term = 0; @@ -411,7 +417,7 @@ void RaftManager::leader() frm->start_replica_threads(); } - logdb->insert_raft_state(raft_state_xml); + logdb->update_raft_state(raft_state_xml); NebulaLog::log("RCM", Log::INFO, "oned is now the leader of zone"); } @@ -480,7 +486,7 @@ void RaftManager::follower(unsigned int _term) frm->stop_replica_threads(); } - logdb->insert_raft_state(raft_state_xml); + logdb->update_raft_state(raft_state_xml); } /* -------------------------------------------------------------------------- */ @@ -672,7 +678,7 @@ int RaftManager::update_votedfor(int _votedfor) pthread_mutex_unlock(&mutex); - logdb->insert_raft_state(raft_state_xml); + logdb->update_raft_state(raft_state_xml); return 0; } @@ -927,7 +933,7 @@ void RaftManager::request_vote() pthread_mutex_unlock(&mutex); - logdb->insert_raft_state(raft_state_xml); + logdb->update_raft_state(raft_state_xml); logdb->get_last_record_index(lindex, lterm); @@ -1008,7 +1014,7 @@ void RaftManager::request_vote() pthread_mutex_unlock(&mutex); - logdb->insert_raft_state(raft_state_xml); + logdb->update_raft_state(raft_state_xml); srand(_server_id); diff --git a/src/sql/LogDB.cc b/src/sql/LogDB.cc index 70de9b00e6..a29ea8093f 100644 --- a/src/sql/LogDB.cc +++ b/src/sql/LogDB.cc @@ -80,6 +80,18 @@ LogDB::LogDB(SqlDB * _db, bool _solo, unsigned int _lret):solo(_solo), db(_db), pthread_mutex_init(&mutex, 0); + LogDBRecord lr; + + + if ( get_log_record(0, lr) != 0 || lr.sql.empty() ) + { + std::ostringstream oss; + + oss << time(0); + + insert_log_record(0, 0, oss, time(0)); + } + setup_index(r, i); }; @@ -194,42 +206,49 @@ int LogDB::get_raft_state(std::string &raft_xml) { ostringstream oss; - std::string zraft_xml; - single_cb cb; oss << "SELECT sql FROM logdb WHERE log_index = -1 AND term = -1"; - cb.set_callback(&zraft_xml); + cb.set_callback(&raft_xml); int rc = db->exec_rd(oss, &cb); cb.unset_callback(); - if ( zraft_xml.empty() ) + if ( raft_xml.empty() ) { rc = -1; } - std::string * _raft_xml = one_util::zlib_decompress(zraft_xml, true); - - if ( _raft_xml == 0 ) - { - return -1; - } - - raft_xml = *_raft_xml; - - delete _raft_xml; - return rc; } /* -------------------------------------------------------------------------- */ /* -------------------------------------------------------------------------- */ -int LogDB::insert_replace(int index, int term, const std::string& sql, - time_t timestamp) +int LogDB::update_raft_state(std::string& raft_xml) +{ + std::ostringstream oss; + + char * sql_db = db->escape_str(raft_xml.c_str()); + + if ( sql_db == 0 ) + { + return -1; + } + + oss << "UPDATE logdb SET sql ='" << sql_db << "' WHERE log_index = -1"; + + db->free_str(sql_db); + + return db->exec_wr(oss); +} + +/* -------------------------------------------------------------------------- */ +/* -------------------------------------------------------------------------- */ + +int LogDB::insert(int index, int term, const std::string& sql, time_t tstamp) { std::ostringstream oss; @@ -251,14 +270,27 @@ int LogDB::insert_replace(int index, int term, const std::string& sql, return -1; } - oss << "REPLACE INTO " << table << " ("<< db_names <<") VALUES (" - << index << "," - << term << "," - << "'" << sql_db << "'," - << timestamp << ")"; + oss << "INSERT INTO " << table << " ("<< db_names <<") VALUES (" + << index << "," << term << "," << "'" << sql_db << "'," << tstamp<< ")"; int rc = db->exec_wr(oss); + if ( rc != 0 ) + { + //Check for duplicate (leader retrying i.e. xmlrpc client timeout) + LogDBRecord lr; + + if ( get_log_record(index, lr) == 0 && !lr.sql.empty() ) + { + NebulaLog::log("DBM", Log::ERROR, "Duplicated log record"); + rc = 0; + } + else + { + rc = -1; + } + } + db->free_str(sql_db); return rc; @@ -277,7 +309,15 @@ int LogDB::apply_log_record(LogDBRecord * lr) if ( rc == 0 ) { - insert_replace(lr->index, lr->term, lr->sql, time(0)); + std::ostringstream oss; + + oss << "UPDATE logdb SET timestamp = " << time(0) << " WHERE " + << "log_index = " << lr->index << " AND timestamp = 0"; + + if ( db->exec_wr(oss) != 0 ) + { + NebulaLog::log("DBM", Log::ERROR, "Cannot update log record"); + } last_applied = lr->index; } @@ -295,7 +335,7 @@ int LogDB::insert_log_record(unsigned int term, std::ostringstream& sql, unsigned int index = next_index; - if ( insert_replace(index, term, sql.str(), timestamp) != 0 ) + if ( insert(index, term, sql.str(), timestamp) != 0 ) { NebulaLog::log("DBM", Log::ERROR, "Cannot insert log record in DB"); @@ -325,7 +365,7 @@ int LogDB::insert_log_record(unsigned int index, unsigned int term, pthread_mutex_lock(&mutex); - rc = insert_replace(index, term, sql.str(), timestamp); + rc = insert(index, term, sql.str(), timestamp); if ( rc == 0 ) { @@ -358,12 +398,6 @@ int LogDB::exec_wr(ostringstream& cmd) // ------------------------------------------------------------------------- if ( solo ) { - //TODO USE LAST_TERM IN SOlO MODE TO REENGAGE HA - if ( insert_log_record(0, cmd, time(0)) == -1 ) - { - return -1; - } - return db->exec_wr(cmd); } else if ( raftm == 0 || !raftm->is_leader() ) @@ -492,8 +526,6 @@ int LogDB::purge_log() unsigned int delete_index = last_index - log_retention; - oss.str(""); - // keep the last "log_retention" records as well as those not applied to DB oss << "DELETE FROM logdb WHERE timestamp > 0 AND log_index >= 0 " << "AND log_index < " << delete_index;