diff --git a/include/Callbackable.h b/include/Callbackable.h index 7c5fedab4e..f5ac07333d 100644 --- a/include/Callbackable.h +++ b/include/Callbackable.h @@ -327,5 +327,50 @@ public: }; }; +/* -------------------------------------------------------------------------- */ +/* -------------------------------------------------------------------------- */ + +template< template class Container, class T> +class multiple_cb : public Callbackable +{ +public: + void set_callback(Container * _columns) + { + columns = _columns; + + Callbackable::set_callback( + static_cast(&multiple_cb::callback)); + }; + + int callback(void * nil, int num, char **values, char **names) + { + if ( num == 0 || values == 0 ) + { + return -1; + } + + for (int i=0; i < num ; ++i) + { + if (values[i] == 0) + { + continue; + } + + std::istringstream iss(values[i]); + + T value; + + iss >> value; + + columns->push_back(value); + } + + return 0; + }; + +private: + + Container * columns; +}; #endif /*CALLBACKABLE_H_*/ diff --git a/src/raft/RaftManager.cc b/src/raft/RaftManager.cc index aa948ed4c9..027fa1dc2c 100644 --- a/src/raft/RaftManager.cc +++ b/src/raft/RaftManager.cc @@ -802,13 +802,9 @@ void RaftManager::timer_action(const ActionRequest& ar) // Database housekeeping if ( (purge_tics * timer_period_ms) >= purge_period_ms ) { - ostringstream oss; - Nebula& nd = Nebula::instance(); LogDB * logdb = nd.get_logdb(); - oss << "Purging obsolete LogDB records: "; - int rc = logdb->purge_log(); purge_tics = 0; @@ -817,66 +813,61 @@ void RaftManager::timer_action(const ActionRequest& ar) { purge_tics = (int) ((purge_period_ms - 60000)/timer_period_ms); } - - oss << rc << " records purged"; - - NebulaLog::log("RCM", Log::INFO, oss); } - // Leadership - struct timespec the_time; + // Leadership + struct timespec the_time; - clock_gettime(CLOCK_REALTIME, &the_time); + clock_gettime(CLOCK_REALTIME, &the_time); - pthread_mutex_lock(&mutex); + pthread_mutex_lock(&mutex); - if ( state == LEADER ) // Send the heartbeat - { - time_t sec = last_heartbeat.tv_sec + broadcast_timeout.tv_sec; - long nsec = last_heartbeat.tv_nsec + broadcast_timeout.tv_nsec; + if ( state == LEADER ) // Send the heartbeat + { + time_t sec = last_heartbeat.tv_sec + broadcast_timeout.tv_sec; + long nsec = last_heartbeat.tv_nsec + broadcast_timeout.tv_nsec; - if ((sec < the_time.tv_sec) || (sec == the_time.tv_sec && - nsec <= the_time.tv_nsec)) - { - heartbeat_manager.replicate(); + if ((sec < the_time.tv_sec) || (sec == the_time.tv_sec && + nsec <= the_time.tv_nsec)) + { + heartbeat_manager.replicate(); clock_gettime(CLOCK_REALTIME, &last_heartbeat); pthread_mutex_unlock(&mutex); - } + } else { pthread_mutex_unlock(&mutex); } + } + else if ( state == FOLLOWER ) + { + time_t sec = last_heartbeat.tv_sec + election_timeout.tv_sec; + long nsec = last_heartbeat.tv_nsec + election_timeout.tv_nsec; - } - else if ( state == FOLLOWER ) - { - time_t sec = last_heartbeat.tv_sec + election_timeout.tv_sec; - long nsec = last_heartbeat.tv_nsec + election_timeout.tv_nsec; - - if ((sec < the_time.tv_sec) || (sec == the_time.tv_sec && - nsec <= the_time.tv_nsec)) - { - NebulaLog::log("RRM", Log::ERROR, "Failed to get heartbeat from " - "leader. Starting election proccess"); + if ((sec < the_time.tv_sec) || (sec == the_time.tv_sec && + nsec <= the_time.tv_nsec)) + { + NebulaLog::log("RRM", Log::ERROR, "Failed to get heartbeat from " + "leader. Starting election proccess"); state = CANDIDATE; pthread_mutex_unlock(&mutex); request_vote(); - } + } else { pthread_mutex_unlock(&mutex); } - } - else //SOLO or CANDIDATE, do nothing - { - pthread_mutex_unlock(&mutex); - } + } + else //SOLO or CANDIDATE, do nothing + { + pthread_mutex_unlock(&mutex); + } return; } diff --git a/src/sql/LogDB.cc b/src/sql/LogDB.cc index 4bd585079d..09cb9a9d2d 100644 --- a/src/sql/LogDB.cc +++ b/src/sql/LogDB.cc @@ -576,17 +576,35 @@ int LogDB::apply_log_records(unsigned int commit_index) int LogDB::purge_log() { - std::ostringstream oss; + std::ostringstream oss, foss; empty_cb cb; - int rc = 0; + multiple_cb cb_info; + std::vector maxmin_i; + std::vector maxmin_e; + + int rc = 0; + int frc = 0; pthread_mutex_lock(&mutex); + /* ---------------- Record log state -------------------- */ + + oss << "SELECT MIN(log_index), MAX(log_index) FROM logdb WHERE log_index >= 0"; + + cb_info.set_callback(&maxmin_i); + + db->exec_rd(oss, &cb_info); + + cb_info.unset_callback(); + /* ---------------------------------------------------------------------- */ /* Non-federated records. Keep last log_retention records */ /* ---------------------------------------------------------------------- */ + cb.set_affected_rows(0); + + oss.str(""); oss << "DELETE FROM logdb WHERE timestamp > 0 AND log_index >= 0 " << "AND fed_index = -1 AND log_index < (" << " SELECT MIN(i.log_index) FROM (" @@ -606,11 +624,35 @@ int LogDB::purge_log() rc = cb.get_affected_rows(); } + /* ---------------- Record log state -------------------- */ + + oss.str(""); + oss << "SELECT MIN(log_index), MAX(log_index) FROM logdb WHERE log_index >= 0"; + + cb_info.set_callback(&maxmin_e); + + db->exec_rd(oss, &cb_info); + + cb_info.unset_callback(); + + oss.str(""); + oss << "Purging obsolete LogDB records: " << rc << " records purged. Log state: " + << maxmin_i[0] << "," << maxmin_i[1] << " - " << maxmin_e[0] << "," << maxmin_e[1]; + + NebulaLog::log("DBM", Log::INFO, oss); + /* ---------------------------------------------------------------------- */ /* Federated records. Keep last log_retention federated records */ /* ---------------------------------------------------------------------- */ - if ( fed_log.size() < log_retention ) + + foss << "Purging obsolete federated LogDB records: "; + + if ( fed_log.size() < log_retention ) { + foss << "0 records purged. Federated log size: " << fed_log.size(); + + NebulaLog::log("DBM", Log::INFO, foss); + pthread_mutex_unlock(&mutex); return rc; @@ -619,7 +661,6 @@ int LogDB::purge_log() cb.set_affected_rows(0); oss.str(""); - oss << "DELETE FROM logdb WHERE timestamp > 0 AND log_index >= 0 " << "AND fed_index != -1 AND log_index < (" << " SELECT MIN(i.log_index) FROM (" @@ -636,11 +677,17 @@ int LogDB::purge_log() if ( db->exec_wr(oss, &cb) != -1 ) { - rc += cb.get_affected_rows(); + frc = cb.get_affected_rows(); + + rc += frc; } build_federated_index(); + foss << frc << " records purged. Federated log size: " << fed_log.size(); + + NebulaLog::log("DBM", Log::INFO, foss); + pthread_mutex_unlock(&mutex); return rc;