mirror of
https://github.com/OpenNebula/one.git
synced 2025-03-16 22:50:10 +03:00
development: Add extra debug information for HA and federated log purge
co-authored-by: Christian González <cgonzalez@opennebula.systems> (cherry picked from commit 0de6fa7493cc6ae7d740dea235645f1d41ad0ff2)
This commit is contained in:
parent
11523ef7ed
commit
b25042ab53
@ -327,5 +327,50 @@ public:
|
||||
};
|
||||
};
|
||||
|
||||
/* -------------------------------------------------------------------------- */
|
||||
/* -------------------------------------------------------------------------- */
|
||||
|
||||
template< template<class...> class Container, class T>
|
||||
class multiple_cb : public Callbackable
|
||||
{
|
||||
public:
|
||||
void set_callback(Container<T> * _columns)
|
||||
{
|
||||
columns = _columns;
|
||||
|
||||
Callbackable::set_callback(
|
||||
static_cast<Callbackable::Callback>(&multiple_cb::callback));
|
||||
};
|
||||
|
||||
int callback(void * nil, int num, char **values, char **names)
|
||||
{
|
||||
if ( num == 0 || values == 0 )
|
||||
{
|
||||
return -1;
|
||||
}
|
||||
|
||||
for (int i=0; i < num ; ++i)
|
||||
{
|
||||
if (values[i] == 0)
|
||||
{
|
||||
continue;
|
||||
}
|
||||
|
||||
std::istringstream iss(values[i]);
|
||||
|
||||
T value;
|
||||
|
||||
iss >> value;
|
||||
|
||||
columns->push_back(value);
|
||||
}
|
||||
|
||||
return 0;
|
||||
};
|
||||
|
||||
private:
|
||||
|
||||
Container<T> * columns;
|
||||
};
|
||||
|
||||
#endif /*CALLBACKABLE_H_*/
|
||||
|
@ -802,13 +802,9 @@ void RaftManager::timer_action(const ActionRequest& ar)
|
||||
// Database housekeeping
|
||||
if ( (purge_tics * timer_period_ms) >= purge_period_ms )
|
||||
{
|
||||
ostringstream oss;
|
||||
|
||||
Nebula& nd = Nebula::instance();
|
||||
LogDB * logdb = nd.get_logdb();
|
||||
|
||||
oss << "Purging obsolete LogDB records: ";
|
||||
|
||||
int rc = logdb->purge_log();
|
||||
|
||||
purge_tics = 0;
|
||||
@ -817,66 +813,61 @@ void RaftManager::timer_action(const ActionRequest& ar)
|
||||
{
|
||||
purge_tics = (int) ((purge_period_ms - 60000)/timer_period_ms);
|
||||
}
|
||||
|
||||
oss << rc << " records purged";
|
||||
|
||||
NebulaLog::log("RCM", Log::INFO, oss);
|
||||
}
|
||||
|
||||
// Leadership
|
||||
struct timespec the_time;
|
||||
// Leadership
|
||||
struct timespec the_time;
|
||||
|
||||
clock_gettime(CLOCK_REALTIME, &the_time);
|
||||
clock_gettime(CLOCK_REALTIME, &the_time);
|
||||
|
||||
pthread_mutex_lock(&mutex);
|
||||
pthread_mutex_lock(&mutex);
|
||||
|
||||
if ( state == LEADER ) // Send the heartbeat
|
||||
{
|
||||
time_t sec = last_heartbeat.tv_sec + broadcast_timeout.tv_sec;
|
||||
long nsec = last_heartbeat.tv_nsec + broadcast_timeout.tv_nsec;
|
||||
if ( state == LEADER ) // Send the heartbeat
|
||||
{
|
||||
time_t sec = last_heartbeat.tv_sec + broadcast_timeout.tv_sec;
|
||||
long nsec = last_heartbeat.tv_nsec + broadcast_timeout.tv_nsec;
|
||||
|
||||
|
||||
if ((sec < the_time.tv_sec) || (sec == the_time.tv_sec &&
|
||||
nsec <= the_time.tv_nsec))
|
||||
{
|
||||
heartbeat_manager.replicate();
|
||||
if ((sec < the_time.tv_sec) || (sec == the_time.tv_sec &&
|
||||
nsec <= the_time.tv_nsec))
|
||||
{
|
||||
heartbeat_manager.replicate();
|
||||
|
||||
clock_gettime(CLOCK_REALTIME, &last_heartbeat);
|
||||
|
||||
pthread_mutex_unlock(&mutex);
|
||||
}
|
||||
}
|
||||
else
|
||||
{
|
||||
pthread_mutex_unlock(&mutex);
|
||||
}
|
||||
}
|
||||
else if ( state == FOLLOWER )
|
||||
{
|
||||
time_t sec = last_heartbeat.tv_sec + election_timeout.tv_sec;
|
||||
long nsec = last_heartbeat.tv_nsec + election_timeout.tv_nsec;
|
||||
|
||||
}
|
||||
else if ( state == FOLLOWER )
|
||||
{
|
||||
time_t sec = last_heartbeat.tv_sec + election_timeout.tv_sec;
|
||||
long nsec = last_heartbeat.tv_nsec + election_timeout.tv_nsec;
|
||||
|
||||
if ((sec < the_time.tv_sec) || (sec == the_time.tv_sec &&
|
||||
nsec <= the_time.tv_nsec))
|
||||
{
|
||||
NebulaLog::log("RRM", Log::ERROR, "Failed to get heartbeat from "
|
||||
"leader. Starting election proccess");
|
||||
if ((sec < the_time.tv_sec) || (sec == the_time.tv_sec &&
|
||||
nsec <= the_time.tv_nsec))
|
||||
{
|
||||
NebulaLog::log("RRM", Log::ERROR, "Failed to get heartbeat from "
|
||||
"leader. Starting election proccess");
|
||||
|
||||
state = CANDIDATE;
|
||||
|
||||
pthread_mutex_unlock(&mutex);
|
||||
|
||||
request_vote();
|
||||
}
|
||||
}
|
||||
else
|
||||
{
|
||||
pthread_mutex_unlock(&mutex);
|
||||
}
|
||||
}
|
||||
else //SOLO or CANDIDATE, do nothing
|
||||
{
|
||||
pthread_mutex_unlock(&mutex);
|
||||
}
|
||||
}
|
||||
else //SOLO or CANDIDATE, do nothing
|
||||
{
|
||||
pthread_mutex_unlock(&mutex);
|
||||
}
|
||||
|
||||
return;
|
||||
}
|
||||
|
@ -576,17 +576,35 @@ int LogDB::apply_log_records(unsigned int commit_index)
|
||||
|
||||
int LogDB::purge_log()
|
||||
{
|
||||
std::ostringstream oss;
|
||||
std::ostringstream oss, foss;
|
||||
|
||||
empty_cb cb;
|
||||
|
||||
int rc = 0;
|
||||
multiple_cb<std::vector, int> cb_info;
|
||||
std::vector<int> maxmin_i;
|
||||
std::vector<int> maxmin_e;
|
||||
|
||||
int rc = 0;
|
||||
int frc = 0;
|
||||
|
||||
pthread_mutex_lock(&mutex);
|
||||
|
||||
/* ---------------- Record log state -------------------- */
|
||||
|
||||
oss << "SELECT MIN(log_index), MAX(log_index) FROM logdb WHERE log_index >= 0";
|
||||
|
||||
cb_info.set_callback(&maxmin_i);
|
||||
|
||||
db->exec_rd(oss, &cb_info);
|
||||
|
||||
cb_info.unset_callback();
|
||||
|
||||
/* ---------------------------------------------------------------------- */
|
||||
/* Non-federated records. Keep last log_retention records */
|
||||
/* ---------------------------------------------------------------------- */
|
||||
cb.set_affected_rows(0);
|
||||
|
||||
oss.str("");
|
||||
oss << "DELETE FROM logdb WHERE timestamp > 0 AND log_index >= 0 "
|
||||
<< "AND fed_index = -1 AND log_index < ("
|
||||
<< " SELECT MIN(i.log_index) FROM ("
|
||||
@ -606,11 +624,35 @@ int LogDB::purge_log()
|
||||
rc = cb.get_affected_rows();
|
||||
}
|
||||
|
||||
/* ---------------- Record log state -------------------- */
|
||||
|
||||
oss.str("");
|
||||
oss << "SELECT MIN(log_index), MAX(log_index) FROM logdb WHERE log_index >= 0";
|
||||
|
||||
cb_info.set_callback(&maxmin_e);
|
||||
|
||||
db->exec_rd(oss, &cb_info);
|
||||
|
||||
cb_info.unset_callback();
|
||||
|
||||
oss.str("");
|
||||
oss << "Purging obsolete LogDB records: " << rc << " records purged. Log state: "
|
||||
<< maxmin_i[0] << "," << maxmin_i[1] << " - " << maxmin_e[0] << "," << maxmin_e[1];
|
||||
|
||||
NebulaLog::log("DBM", Log::INFO, oss);
|
||||
|
||||
/* ---------------------------------------------------------------------- */
|
||||
/* Federated records. Keep last log_retention federated records */
|
||||
/* ---------------------------------------------------------------------- */
|
||||
if ( fed_log.size() < log_retention )
|
||||
|
||||
foss << "Purging obsolete federated LogDB records: ";
|
||||
|
||||
if ( fed_log.size() < log_retention )
|
||||
{
|
||||
foss << "0 records purged. Federated log size: " << fed_log.size();
|
||||
|
||||
NebulaLog::log("DBM", Log::INFO, foss);
|
||||
|
||||
pthread_mutex_unlock(&mutex);
|
||||
|
||||
return rc;
|
||||
@ -619,7 +661,6 @@ int LogDB::purge_log()
|
||||
cb.set_affected_rows(0);
|
||||
|
||||
oss.str("");
|
||||
|
||||
oss << "DELETE FROM logdb WHERE timestamp > 0 AND log_index >= 0 "
|
||||
<< "AND fed_index != -1 AND log_index < ("
|
||||
<< " SELECT MIN(i.log_index) FROM ("
|
||||
@ -636,11 +677,17 @@ int LogDB::purge_log()
|
||||
|
||||
if ( db->exec_wr(oss, &cb) != -1 )
|
||||
{
|
||||
rc += cb.get_affected_rows();
|
||||
frc = cb.get_affected_rows();
|
||||
|
||||
rc += frc;
|
||||
}
|
||||
|
||||
build_federated_index();
|
||||
|
||||
foss << frc << " records purged. Federated log size: " << fed_log.size();
|
||||
|
||||
NebulaLog::log("DBM", Log::INFO, foss);
|
||||
|
||||
pthread_mutex_unlock(&mutex);
|
||||
|
||||
return rc;
|
||||
|
Loading…
x
Reference in New Issue
Block a user