1
0
mirror of https://github.com/OpenNebula/one.git synced 2025-04-02 10:50:07 +03:00

F #4809: Fix some bugs when replicating log entries to slave zones

This commit is contained in:
Ruben S. Montero 2017-05-22 18:04:16 +02:00
parent 4a780941ec
commit 31ba4d4d85
8 changed files with 81 additions and 52 deletions

View File

@ -141,4 +141,32 @@ private:
T * value;
};
template<>
class single_cb<std::string> : public Callbackable
{
public:
void set_callback(std::string * _value)
{
value = _value;
Callbackable::set_callback(
static_cast<Callbackable::Callback>(&single_cb::callback));
}
virtual int callback(void *nil, int num, char **values, char **names)
{
if ( values == 0 || values[0] == 0 || num != 1 )
{
return -1;
}
*value = values[0];
return 0;
}
private:
std::string * value;
};
#endif /*CALLBACKABLE_H_*/

View File

@ -37,10 +37,8 @@ public:
* @param _p purge timeout for log
* @param d pointer to underlying DB (LogDB)
* @param l log_retention length (num records)
* @param s true if operating in solo mode
*/
FedReplicaManager(time_t _t, time_t _p, SqlDB * d, const std::string& l,
bool s);
FedReplicaManager(time_t _t, time_t _p, SqlDB * d, const std::string& l);
virtual ~FedReplicaManager();
@ -185,8 +183,8 @@ private:
// -------------------------------------------------------------------------
struct ZoneServers
{
ZoneServers(int z, const std::map<int,std::string>& s):zone_id(z),
servers(s), next(0){};
ZoneServers(int z, unsigned int l, const std::map<int,std::string>& s):
zone_id(z), servers(s), next(l){};
~ZoneServers(){};

View File

@ -681,7 +681,7 @@ private:
"/DEFAULT_GROUP_QUOTAS/NETWORK_QUOTA",
"/DEFAULT_GROUP_QUOTAS/IMAGE_QUOTA",
"/DEFAULT_GROUP_QUOTAS/VM_QUOTA"),
system_db(0), logdb(0),
system_db(0), logdb(0), fed_logdb(0),
vmpool(0), hpool(0), vnpool(0), upool(0), ipool(0), gpool(0), tpool(0),
dspool(0), clpool(0), docpool(0), zonepool(0), secgrouppool(0),
vdcpool(0), vrouterpool(0), marketpool(0), apppool(0), vmgrouppool(0),
@ -754,6 +754,7 @@ private:
delete frm;
delete nebula_configuration;
delete logdb;
delete fed_logdb;
delete system_db;
};
@ -809,6 +810,7 @@ private:
// ---------------------------------------------------------------
LogDB * logdb;
FedLogDB * fed_logdb;
VirtualMachinePool * vmpool;
HostPool * hpool;
VirtualNetworkPool * vnpool;

View File

@ -58,15 +58,8 @@ public:
~RaftManager()
{
if ( leader_hook != 0 )
{
delete(leader_hook);
}
if ( follower_hook != 0 )
{
delete(follower_hook);
}
delete leader_hook;
delete follower_hook;
};
// -------------------------------------------------------------------------

View File

@ -39,14 +39,16 @@ using namespace std;
void Nebula::start(bool bootstrap_only)
{
int rc;
int fd;
sigset_t mask;
int signal;
char hn[80];
string scripts_remote_dir;
SqlDB * db_backend;
bool solo;
int rc;
int fd;
sigset_t mask;
int signal;
char hn[80];
string scripts_remote_dir;
SqlDB * db_backend;
bool solo;
SqlDB * db_ptr;
if ( gethostname(hn,79) != 0 )
{
@ -321,6 +323,16 @@ void Nebula::start(bool bootstrap_only)
logdb = new LogDB(db_backend, solo, log_retention);
if ( federation_master )
{
fed_logdb = new FedLogDB(logdb);
db_ptr = fed_logdb;
}
else
{
db_ptr = logdb;
}
// ---------------------------------------------------------------------
// Prepare the SystemDB and check versions
// ---------------------------------------------------------------------
@ -468,7 +480,7 @@ void Nebula::start(bool bootstrap_only)
// ---- ACL Manager ----
try
{
aclm = new AclManager(logdb, zone_id, is_federation_slave(),
aclm = new AclManager(db_ptr, zone_id, is_federation_slave(),
timer_period);
}
catch (bad_alloc&)
@ -595,14 +607,14 @@ void Nebula::start(bool bootstrap_only)
nebula_configuration->get("GROUP_HOOK", group_hooks);
gpool = new GroupPool(logdb, group_hooks, remotes_location,
gpool = new GroupPool(db_ptr, group_hooks, remotes_location,
is_federation_slave());
nebula_configuration->get("SESSION_EXPIRATION_TIME", expiration_time);
nebula_configuration->get("USER_HOOK", user_hooks);
upool = new UserPool(logdb, expiration_time, user_hooks,
upool = new UserPool(db_ptr, expiration_time, user_hooks,
remotes_location, is_federation_slave());
/* -------------------- Image/Datastore Pool ------------------------ */
@ -635,15 +647,15 @@ void Nebula::start(bool bootstrap_only)
/* ----- Document, Zone, VDC, VMTemplate, SG and Makerket Pools ----- */
docpool = new DocumentPool(logdb);
zonepool = new ZonePool(logdb, is_federation_slave());
vdcpool = new VdcPool(logdb, is_federation_slave());
zonepool = new ZonePool(db_ptr, is_federation_slave());
vdcpool = new VdcPool(db_ptr, is_federation_slave());
tpool = new VMTemplatePool(logdb);
secgrouppool = new SecurityGroupPool(logdb);
marketpool = new MarketPlacePool(logdb, is_federation_slave());
apppool = new MarketPlaceAppPool(logdb, is_federation_slave());
marketpool = new MarketPlacePool(db_ptr, is_federation_slave());
apppool = new MarketPlaceAppPool(db_ptr, is_federation_slave());
vmgrouppool = new VMGroupPool(logdb);
@ -697,8 +709,7 @@ void Nebula::start(bool bootstrap_only)
// ---- FedReplica Manager ----
try
{
frm = new FedReplicaManager(timer_period, log_purge, logdb,
log_retention, solo);
frm = new FedReplicaManager(timer_period,log_purge,logdb,log_retention);
}
catch (bad_alloc&)
{
@ -707,6 +718,13 @@ void Nebula::start(bool bootstrap_only)
rc = frm->start();
if ( is_federation_master() && solo )
{
// Replica threads are started on master in solo mode.
// HA start/stop the replica threads on leader/follower states will
frm->start_replica_threads();
}
if ( rc != 0 )
{
throw runtime_error("Could not start the Federation Replica Manager");

View File

@ -35,23 +35,14 @@ const time_t FedReplicaManager::xmlrpc_timeout_ms = 2000;
/* -------------------------------------------------------------------------- */
FedReplicaManager::FedReplicaManager(time_t _t, time_t _p, SqlDB * d,
const std::string& l, bool s): ReplicaManager(), timer_period(_t),
purge_period(_p), logdb(d), log_retention(l)
const std::string& l): ReplicaManager(), timer_period(_t), purge_period(_p),
logdb(d), log_retention(l)
{
Nebula& nd = Nebula::instance();
pthread_mutex_init(&mutex, 0);
am.addListener(this);
get_last_index(last_index);
// Replica threads are started on master in solo mode.
// HA start/stop the replica threads on leader/follower states will
if ( nd.is_federation_master() && s )
{
start_replica_threads();
}
};
/* -------------------------------------------------------------------------- */
@ -82,7 +73,7 @@ int FedReplicaManager::replicate(const std::string& sql)
{
pthread_mutex_lock(&mutex);
if ( insert_log_record(last_index+1, sql) == 0 )
if ( insert_log_record(last_index+1, sql) != 0 )
{
pthread_mutex_unlock(&mutex);
return -1;
@ -115,7 +106,7 @@ int FedReplicaManager::apply_log_record(int index, const std::string& sql)
return rc;
}
if ( insert_log_record(last_index + 1, sql) == 0 )
if ( insert_log_record(last_index + 1, sql) != 0 )
{
pthread_mutex_unlock(&mutex);
return -1;
@ -212,7 +203,7 @@ void FedReplicaManager::update_zones(std::vector<int>& zone_ids)
{
zpool->get_zone_servers(*it, zone_servers);
ZoneServers * zs = new ZoneServers(*it, zone_servers);
ZoneServers * zs = new ZoneServers(*it, last_index, zone_servers);
zones.insert(make_pair(*it, zs));
@ -237,10 +228,10 @@ void FedReplicaManager::add_zone(int zone_id)
zpool->get_zone_servers(zone_id, zone_servers);
ZoneServers * zs = new ZoneServers(zone_id, zone_servers);
pthread_mutex_lock(&mutex);
ZoneServers * zs = new ZoneServers(zone_id, last_index, zone_servers);
zones.insert(make_pair(zone_id, zs));
pthread_mutex_unlock(&mutex);

View File

@ -393,7 +393,7 @@ void ZoneReplicateFedLog::request_execute(xmlrpc_c::paramList const& paramList,
}
else if ( rc < 0 )
{
oss << "Error replicating log entry " << index << "in zone";
oss << "Error replicating log entry " << index << " in zone";
NebulaLog::log("ReM", Log::INFO, oss);

View File

@ -432,7 +432,7 @@ int LogDB::purge_log()
int FedLogDB::exec_wr(ostringstream& cmd)
{
//FedReplicaManager * fedrm = Nebula::instance().get_fedrm();
FedReplicaManager * frm = Nebula::instance().get_frm();
int rc = _logdb->exec_wr(cmd);
@ -441,8 +441,7 @@ int FedLogDB::exec_wr(ostringstream& cmd)
return rc;
}
//Replicate on slaves
// fedrm->replicate(cmd);
frm->replicate(cmd.str());
return rc;
}