mirror of
https://github.com/OpenNebula/one.git
synced 2025-01-11 05:17:41 +03:00
F #4809: Compress DB commands.
This commit is contained in:
parent
384ab42235
commit
f12fdfaead
@ -65,25 +65,7 @@ private:
|
||||
/**
|
||||
* SQL callback to load logDBRecord from DB (SELECT commands)
|
||||
*/
|
||||
int select_cb(void *nil, int num, char **values, char **names)
|
||||
{
|
||||
if ( !values || !values[0] || !values[1] || !values[2] || !values[3] ||
|
||||
!values[4] || !values[5] || num != 6 )
|
||||
{
|
||||
return -1;
|
||||
}
|
||||
|
||||
index = static_cast<unsigned int>(atoi(values[0]));
|
||||
term = static_cast<unsigned int>(atoi(values[1]));
|
||||
sql = values[2];
|
||||
|
||||
timestamp = static_cast<unsigned int>(atoi(values[3]));
|
||||
|
||||
prev_index = static_cast<unsigned int>(atoi(values[4]));
|
||||
prev_term = static_cast<unsigned int>(atoi(values[5]));
|
||||
|
||||
return 0;
|
||||
}
|
||||
int select_cb(void *nil, int num, char **values, char **names);
|
||||
};
|
||||
|
||||
/**
|
||||
@ -112,11 +94,8 @@ public:
|
||||
/**
|
||||
* Applies the SQL command of the given record to the database. The
|
||||
* timestamp of the record is updated.
|
||||
* @param lr the log record
|
||||
* @param index of the log record
|
||||
*/
|
||||
int apply_log_record(LogDBRecord * lr);
|
||||
|
||||
int apply_log_records(unsigned int commit_index);
|
||||
|
||||
/**
|
||||
@ -278,6 +257,13 @@ private:
|
||||
|
||||
static const char * db_bootstrap;
|
||||
|
||||
/**
|
||||
* Applies the SQL command of the given record to the database. The
|
||||
* timestamp of the record is updated.
|
||||
* @param lr the log record
|
||||
*/
|
||||
int apply_log_record(LogDBRecord * lr);
|
||||
|
||||
/**
|
||||
* Inserts or update a log record in the database
|
||||
* @param index of the log entry
|
||||
|
@ -370,7 +370,10 @@ void RaftManager::leader()
|
||||
|
||||
requests.clear();
|
||||
|
||||
leader_hook->do_hook(0);
|
||||
if ( leader_hook != 0 )
|
||||
{
|
||||
leader_hook->do_hook(0);
|
||||
}
|
||||
|
||||
state = LEADER;
|
||||
|
||||
@ -431,7 +434,7 @@ void RaftManager::follower(unsigned int _term)
|
||||
|
||||
pthread_mutex_lock(&mutex);
|
||||
|
||||
if ( state == LEADER )
|
||||
if ( state == LEADER && follower_hook != 0 )
|
||||
{
|
||||
follower_hook->do_hook(0);
|
||||
}
|
||||
|
@ -16,6 +16,7 @@
|
||||
|
||||
#include "LogDB.h"
|
||||
#include "Nebula.h"
|
||||
#include "NebulaUtil.h"
|
||||
#include "ZoneServer.h"
|
||||
#include "Callbackable.h"
|
||||
|
||||
@ -33,6 +34,44 @@ const char * LogDB::db_bootstrap = "CREATE TABLE IF NOT EXISTS "
|
||||
/* -------------------------------------------------------------------------- */
|
||||
/* -------------------------------------------------------------------------- */
|
||||
|
||||
int LogDBRecord::select_cb(void *nil, int num, char **values, char **names)
|
||||
{
|
||||
if ( !values || !values[0] || !values[1] || !values[2] || !values[3] ||
|
||||
!values[4] || !values[5] || num != 6 )
|
||||
{
|
||||
return -1;
|
||||
}
|
||||
|
||||
std::string zsql;
|
||||
|
||||
std::string * _sql;
|
||||
|
||||
index = static_cast<unsigned int>(atoi(values[0]));
|
||||
term = static_cast<unsigned int>(atoi(values[1]));
|
||||
zsql = values[2];
|
||||
|
||||
timestamp = static_cast<unsigned int>(atoi(values[3]));
|
||||
|
||||
prev_index = static_cast<unsigned int>(atoi(values[4]));
|
||||
prev_term = static_cast<unsigned int>(atoi(values[5]));
|
||||
|
||||
_sql = one_util::zlib_decompress(zsql, true);
|
||||
|
||||
if ( _sql == 0 )
|
||||
{
|
||||
return -1;
|
||||
}
|
||||
|
||||
sql = *_sql;
|
||||
|
||||
delete _sql;
|
||||
|
||||
return 0;
|
||||
}
|
||||
|
||||
/* -------------------------------------------------------------------------- */
|
||||
/* -------------------------------------------------------------------------- */
|
||||
|
||||
LogDB::LogDB(SqlDB * _db, bool _solo, unsigned int _lret):solo(_solo), db(_db),
|
||||
next_index(0), last_applied(-1), last_index(-1), last_term(-1),
|
||||
log_retention(_lret)
|
||||
@ -155,21 +194,34 @@ int LogDB::get_raft_state(std::string &raft_xml)
|
||||
{
|
||||
ostringstream oss;
|
||||
|
||||
std::string zraft_xml;
|
||||
|
||||
single_cb<std::string> cb;
|
||||
|
||||
oss << "SELECT sql FROM logdb WHERE log_index = -1 AND term = -1";
|
||||
|
||||
cb.set_callback(&raft_xml);
|
||||
cb.set_callback(&zraft_xml);
|
||||
|
||||
int rc = db->exec_rd(oss, &cb);
|
||||
|
||||
cb.unset_callback();
|
||||
|
||||
if ( raft_xml.empty() )
|
||||
if ( zraft_xml.empty() )
|
||||
{
|
||||
rc = -1;
|
||||
}
|
||||
|
||||
std::string * _raft_xml = one_util::zlib_decompress(zraft_xml, true);
|
||||
|
||||
if ( _raft_xml == 0 )
|
||||
{
|
||||
return -1;
|
||||
}
|
||||
|
||||
raft_xml = *_raft_xml;
|
||||
|
||||
delete _raft_xml;
|
||||
|
||||
return rc;
|
||||
}
|
||||
|
||||
@ -181,7 +233,18 @@ int LogDB::insert_replace(int index, int term, const std::string& sql,
|
||||
{
|
||||
std::ostringstream oss;
|
||||
|
||||
char * sql_db = db->escape_str(sql.c_str());
|
||||
std::string * zsql;
|
||||
|
||||
zsql = one_util::zlib_compress(sql, true);
|
||||
|
||||
if ( zsql == 0 )
|
||||
{
|
||||
return -1;
|
||||
}
|
||||
|
||||
char * sql_db = db->escape_str(zsql->c_str());
|
||||
|
||||
delete zsql;
|
||||
|
||||
if ( sql_db == 0 )
|
||||
{
|
||||
@ -360,7 +423,7 @@ int LogDB::delete_log_records(unsigned int start_index)
|
||||
|
||||
pthread_mutex_lock(&mutex);
|
||||
|
||||
oss << "DELETE FROM " << table << " WHERE log_index >= start_index";
|
||||
oss << "DELETE FROM " << table << " WHERE log_index >= " << start_index;
|
||||
|
||||
rc = db->exec_wr(oss);
|
||||
|
||||
|
Loading…
Reference in New Issue
Block a user