1
0
mirror of https://github.com/OpenNebula/one.git synced 2025-01-31 01:47:11 +03:00

development: Fix race condition between the expiration of the replica timeout and the pending_requests state change. api server cache mode

Co-authored-by: Christian González <cgonzalez@opennebula.systems>
This commit is contained in:
Ruben S. Montero 2019-02-19 12:40:07 +01:00
parent fe170d0b54
commit 8fbd126ca2
8 changed files with 356 additions and 246 deletions

View File

@ -82,7 +82,7 @@ private:
class LogDB : public SqlDB
{
public:
LogDB(SqlDB * _db, bool solo, unsigned int log_retention,
LogDB(SqlDB * _db, bool solo, bool cache, unsigned int log_retention,
unsigned int limit_purge);
virtual ~LogDB();
@ -269,6 +269,11 @@ private:
*/
bool solo;
/**
* True for cache servers
*/
bool cache;
/**
* Pointer to the underlying DB store
*/

View File

@ -416,6 +416,11 @@ public:
return federation_enabled && !federation_master;
};
bool is_cache()
{
return cache;
};
int get_zone_id()
{
return zone_id;
@ -812,6 +817,7 @@ private:
bool federation_enabled;
bool federation_master;
bool cache;
int zone_id;
int server_id;
string master_oned;

View File

@ -182,7 +182,7 @@ void InformationManager::timer_action(const ActionRequest& ar)
Nebula& nd = Nebula::instance();
RaftManager * raftm = nd.get_raftm();
if ( !raftm->is_leader() && !raftm->is_solo() )
if ( !raftm->is_leader() && !raftm->is_solo() && !nd.is_cache() )
{
return;
}

View File

@ -166,13 +166,14 @@ void Nebula::start(bool bootstrap_only)
// -----------------------------------------------------------
// Init federation configuration
// -----------------------------------------------------------
const VectorAttribute * vatt = nebula_configuration->get("FEDERATION");
federation_enabled = false;
federation_master = false;
cache = false;
zone_id = 0;
server_id = -1;
master_oned = "";
const VectorAttribute * vatt = nebula_configuration->get("FEDERATION");
master_oned = vatt->vector_value("MASTER_ONED");
if (vatt != 0)
{
@ -182,23 +183,46 @@ void Nebula::start(bool bootstrap_only)
if (mode == "STANDALONE")
{
federation_enabled = false;
federation_master = false;
federation_master = false;
cache = false;
zone_id = 0;
}
else if (mode == "MASTER")
{
federation_enabled = true;
federation_master = true;
federation_master = true;
cache = false;
}
else if (mode == "SLAVE")
{
federation_enabled = true;
federation_master = false;
federation_master = false;
cache = false;
if ( master_oned.empty() )
{
throw runtime_error("MASTER_ONED endpoint is missing.");
}
}
else if (mode == "CACHE")
{
federation_enabled = false;
federation_master = false;
cache = true;
if ( master_oned.empty() )
{
throw runtime_error("MASTER_ONED is missing.");
}
}
else
{
throw runtime_error(
"FEDERATION MODE must be one of STANDALONE, MASTER, SLAVE.");
"FEDERATION MODE must be one of STANDALONE, MASTER, SLAVE or CACHE.");
}
if (federation_enabled)
@ -210,21 +234,12 @@ void Nebula::start(bool bootstrap_only)
throw runtime_error("FEDERATION ZONE_ID must be set for "
"federated instances.");
}
master_oned = vatt->vector_value("MASTER_ONED");
if (master_oned.empty() && !federation_master)
{
throw runtime_error(
"FEDERATION MASTER_ONED endpoint is missing.");
}
}
if ( vatt->vector_value("SERVER_ID", server_id) != 0 )
{
server_id = -1;
}
}
vatt = nebula_configuration->get("RAFT");
@ -331,13 +346,17 @@ void Nebula::start(bool bootstrap_only)
if ( (solo && local_bootstrap) || bootstrap_only)
{
if (cache)
{
throw runtime_error("Error getting database. An existing database is needed for CACHE mode.");
}
if ( LogDB::bootstrap(db_backend) != 0 )
{
throw runtime_error("Error bootstrapping database.");
}
}
logdb = new LogDB(db_backend, solo, log_retention, limit_purge);
logdb = new LogDB(db_backend, solo, cache, log_retention, limit_purge);
if ( federation_master )
{
@ -691,31 +710,33 @@ void Nebula::start(bool bootstrap_only)
}
// ---- Hook Manager ----
try
if (!cache)
{
vector<const VectorAttribute *> hm_mads;
try
{
vector<const VectorAttribute *> hm_mads;
nebula_configuration->get("HM_MAD", hm_mads);
nebula_configuration->get("HM_MAD", hm_mads);
hm = new HookManager(hm_mads,vmpool);
hm = new HookManager(hm_mads,vmpool);
}
catch (bad_alloc&)
{
throw;
}
rc = hm->start();
if ( rc != 0 )
{
throw runtime_error("Could not start the Hook Manager");
}
if (hm->load_mads(0) != 0)
{
goto error_mad;
}
}
catch (bad_alloc&)
{
throw;
}
rc = hm->start();
if ( rc != 0 )
{
throw runtime_error("Could not start the Hook Manager");
}
if (hm->load_mads(0) != 0)
{
goto error_mad;
}
// ---- Raft Manager ----
const VectorAttribute * raft_leader_hook;
@ -734,158 +755,179 @@ void Nebula::start(bool bootstrap_only)
throw;
}
rc = raftm->start();
if ( rc != 0 )
if (!cache)
{
throw runtime_error("Could not start the Raft Consensus Manager");
rc = raftm->start();
if ( rc != 0 )
{
throw runtime_error("Could not start the Raft Consensus Manager");
}
}
// ---- FedReplica Manager ----
try
if (!cache)
{
frm = new FedReplicaManager(logdb);
}
catch (bad_alloc&)
{
throw;
}
try
{
frm = new FedReplicaManager(logdb);
}
catch (bad_alloc&)
{
throw;
}
rc = frm->start();
rc = frm->start();
if ( is_federation_master() && solo )
{
// Replica threads are started on master in solo mode.
// HA start/stop the replica threads on leader/follower states
frm->start_replica_threads();
}
if ( is_federation_master() && solo )
{
// Replica threads are started on master in solo mode.
// HA start/stop the replica threads on leader/follower states
frm->start_replica_threads();
}
if ( rc != 0 )
{
throw runtime_error("Could not start the Federation Replica Manager");
if ( rc != 0 )
{
throw runtime_error("Could not start the Federation Replica Manager");
}
}
// ---- Virtual Machine Manager ----
try
if (!cache)
{
vector<const VectorAttribute *> vmm_mads;
int vm_limit;
try
{
vector<const VectorAttribute *> vmm_mads;
int vm_limit;
bool do_poll;
bool do_poll;
nebula_configuration->get("VM_PER_INTERVAL", vm_limit);
nebula_configuration->get("VM_PER_INTERVAL", vm_limit);
nebula_configuration->get("VM_MAD", vmm_mads);
nebula_configuration->get("VM_MAD", vmm_mads);
nebula_configuration->get("VM_INDIVIDUAL_MONITORING", do_poll);
nebula_configuration->get("VM_INDIVIDUAL_MONITORING", do_poll);
vmm = new VirtualMachineManager(
timer_period,
monitor_interval_vm,
do_poll,
vm_limit,
vmm_mads);
}
catch (bad_alloc&)
{
throw;
}
vmm = new VirtualMachineManager(
timer_period,
monitor_interval_vm,
do_poll,
vm_limit,
vmm_mads);
}
catch (bad_alloc&)
{
throw;
}
rc = vmm->start();
rc = vmm->start();
if ( rc != 0 )
{
throw runtime_error("Could not start the Virtual Machine Manager");
if ( rc != 0 )
{
throw runtime_error("Could not start the Virtual Machine Manager");
}
}
// ---- Life-cycle Manager ----
try
if (!cache)
{
lcm = new LifeCycleManager();
}
catch (bad_alloc&)
{
throw;
}
try
{
lcm = new LifeCycleManager();
}
catch (bad_alloc&)
{
throw;
}
rc = lcm->start();
rc = lcm->start();
if ( rc != 0 )
{
throw runtime_error("Could not start the Life-cycle Manager");
if ( rc != 0 )
{
throw runtime_error("Could not start the Life-cycle Manager");
}
}
// ---- Information Manager ----
try
if (!cache)
{
vector<const VectorAttribute *> im_mads;
try
{
vector<const VectorAttribute *> im_mads;
int host_limit;
int monitor_threads;
int host_limit;
int monitor_threads;
nebula_configuration->get("HOST_PER_INTERVAL", host_limit);
nebula_configuration->get("HOST_PER_INTERVAL", host_limit);
nebula_configuration->get("MONITORING_THREADS", monitor_threads);
nebula_configuration->get("MONITORING_THREADS", monitor_threads);
nebula_configuration->get("IM_MAD", im_mads);
nebula_configuration->get("IM_MAD", im_mads);
im = new InformationManager(hpool,
clpool,
timer_period,
monitor_interval_host,
host_limit,
monitor_threads,
remotes_location,
im_mads);
}
catch (bad_alloc&)
{
throw;
}
im = new InformationManager(hpool,
clpool,
timer_period,
monitor_interval_host,
host_limit,
monitor_threads,
remotes_location,
im_mads);
}
catch (bad_alloc&)
{
throw;
}
rc = im->start();
rc = im->start();
if ( rc != 0 )
{
throw runtime_error("Could not start the Information Manager");
if ( rc != 0 )
{
throw runtime_error("Could not start the Information Manager");
}
}
// ---- Transfer Manager ----
try
if (!cache)
{
vector<const VectorAttribute *> tm_mads;
try
{
vector<const VectorAttribute *> tm_mads;
nebula_configuration->get("TM_MAD", tm_mads);
nebula_configuration->get("TM_MAD", tm_mads);
tm = new TransferManager(vmpool, hpool, tm_mads);
}
catch (bad_alloc&)
{
throw;
}
tm = new TransferManager(vmpool, hpool, tm_mads);
}
catch (bad_alloc&)
{
throw;
}
rc = tm->start();
rc = tm->start();
if ( rc != 0 )
{
throw runtime_error("Could not start the Transfer Manager");
if ( rc != 0 )
{
throw runtime_error("Could not start the Transfer Manager");
}
}
// ---- Dispatch Manager ----
try
if (!cache)
{
dm = new DispatchManager();
}
catch (bad_alloc&)
{
throw;
}
try
{
dm = new DispatchManager();
}
catch (bad_alloc&)
{
throw;
}
rc = dm->start();
rc = dm->start();
if ( rc != 0 )
{
throw runtime_error("Could not start the Dispatch Manager");
if ( rc != 0 )
{
throw runtime_error("Could not start the Dispatch Manager");
}
}
// ---- Auth Manager ----
@ -920,70 +962,79 @@ void Nebula::start(bool bootstrap_only)
}
// ---- Image Manager ----
try
if (!cache)
{
vector<const VectorAttribute *> image_mads;
try
{
vector<const VectorAttribute *> image_mads;
nebula_configuration->get("DATASTORE_MAD", image_mads);
nebula_configuration->get("DATASTORE_MAD", image_mads);
imagem = new ImageManager(timer_period,
monitor_interval_datastore,
ipool,
dspool,
image_mads);
}
catch (bad_alloc&)
{
throw;
}
imagem = new ImageManager(timer_period,
monitor_interval_datastore,
ipool,
dspool,
image_mads);
}
catch (bad_alloc&)
{
throw;
}
rc = imagem->start();
rc = imagem->start();
if ( rc != 0 )
{
throw runtime_error("Could not start the Image Manager");
if ( rc != 0 )
{
throw runtime_error("Could not start the Image Manager");
}
}
// ---- Marketplace Manager ----
try
if (!cache)
{
vector<const VectorAttribute *> mmads ;
try
{
vector<const VectorAttribute *> mmads ;
nebula_configuration->get("MARKET_MAD", mmads);
nebula_configuration->get("MARKET_MAD", mmads);
marketm = new MarketPlaceManager(timer_period, monitor_interval_market, mmads);
}
catch (bad_alloc&)
{
throw;
}
marketm = new MarketPlaceManager(timer_period, monitor_interval_market, mmads);
}
catch (bad_alloc&)
{
throw;
}
rc = marketm->start();
rc = marketm->start();
if ( rc != 0 )
{
throw runtime_error("Could not start the Marketplace Manager");
if ( rc != 0 )
{
throw runtime_error("Could not start the Marketplace Manager");
}
}
// ---- IPAM Manager ----
try
if (!cache)
{
vector<const VectorAttribute *> ipam_mads ;
try
{
vector<const VectorAttribute *> ipam_mads ;
nebula_configuration->get("IPAM_MAD", ipam_mads);
nebula_configuration->get("IPAM_MAD", ipam_mads);
ipamm = new IPAMManager(timer_period, ipam_mads);
}
catch (bad_alloc&)
{
throw;
}
ipamm = new IPAMManager(timer_period, ipam_mads);
}
catch (bad_alloc&)
{
throw;
}
rc = ipamm->start();
rc = ipamm->start();
if ( rc != 0 )
{
throw runtime_error("Could not start the IPAM Manager");
if ( rc != 0 )
{
throw runtime_error("Could not start the IPAM Manager");
}
}
// -----------------------------------------------------------
@ -994,34 +1045,37 @@ void Nebula::start(bool bootstrap_only)
rc = 0;
if (vmm->load_mads(0) != 0)
if (!cache)
{
goto error_mad;
}
if (vmm->load_mads(0) != 0)
{
goto error_mad;
}
if (im->load_mads(0) != 0)
{
goto error_mad;
}
if (im->load_mads(0) != 0)
{
goto error_mad;
}
if (tm->load_mads(0) != 0)
{
goto error_mad;
}
if (tm->load_mads(0) != 0)
{
goto error_mad;
}
if (imagem->load_mads(0) != 0)
{
goto error_mad;
}
if (imagem->load_mads(0) != 0)
{
goto error_mad;
}
if (marketm->load_mads(0) != 0)
{
goto error_mad;
}
if (marketm->load_mads(0) != 0)
{
goto error_mad;
}
if (ipamm->load_mads(0) != 0)
{
goto error_mad;
if (ipamm->load_mads(0) != 0)
{
goto error_mad;
}
}
if ( authm != 0 )
@ -1075,11 +1129,14 @@ void Nebula::start(bool bootstrap_only)
// ---- Initialize Manager cross-reference pointers and pool references ----
dm->init_managers();
if (!cache)
{
dm->init_managers();
lcm->init_managers();
lcm->init_managers();
marketm->init_managers();
marketm->init_managers();
}
// ---- Start the Request Manager ----
@ -1111,44 +1168,44 @@ void Nebula::start(bool bootstrap_only)
// Stop the managers & free resources
// -----------------------------------------------------------
vmm->finalize();
lcm->finalize();
if (!cache)
{
vmm->finalize();
lcm->finalize();
tm->finalize();
dm->finalize();
tm->finalize();
dm->finalize();
im->finalize();
im->finalize();
hm->finalize();
imagem->finalize();
marketm->finalize();
ipamm->finalize();
frm->finalize();
//sleep to wait drivers???
pthread_join(vmm->get_thread_id(),0);
pthread_join(lcm->get_thread_id(),0);
pthread_join(tm->get_thread_id(),0);
pthread_join(dm->get_thread_id(),0);
pthread_join(im->get_thread_id(),0);
pthread_join(hm->get_thread_id(),0);
pthread_join(imagem->get_thread_id(),0);
pthread_join(marketm->get_thread_id(),0);
pthread_join(ipamm->get_thread_id(),0);
pthread_join(frm->get_thread_id(),0);
}
raftm->finalize();
aclm->finalize();
rm->finalize();
authm->finalize();
hm->finalize();
imagem->finalize();
marketm->finalize();
ipamm->finalize();
aclm->finalize();
raftm->finalize();
frm->finalize();
//sleep to wait drivers???
pthread_join(vmm->get_thread_id(),0);
pthread_join(lcm->get_thread_id(),0);
pthread_join(tm->get_thread_id(),0);
pthread_join(dm->get_thread_id(),0);
pthread_join(im->get_thread_id(),0);
pthread_join(rm->get_thread_id(),0);
pthread_join(hm->get_thread_id(),0);
pthread_join(authm->get_thread_id(),0);
pthread_join(imagem->get_thread_id(),0);
pthread_join(marketm->get_thread_id(),0);
pthread_join(ipamm->get_thread_id(),0);
pthread_join(aclm->get_thread_id(),0);
pthread_join(raftm->get_thread_id(),0);
pthread_join(frm->get_thread_id(),0);
if(is_federation_slave())
{
pthread_join(aclm->get_thread_id(),0);
}
//XML Library
xmlCleanupParser();

View File

@ -601,7 +601,7 @@ void RaftManager::replicate_log(ReplicaRequest * request)
{
to_commit--;
}
else if ( rindex == (int) it->second )
else
{
replica_manager.replicate(it->first);
}
@ -789,6 +789,13 @@ void RaftManager::timer_action(const ActionRequest& ar)
static int purge_tics = 0;
ostringstream oss;
Nebula& nd = Nebula::instance();
if ( nd.is_cache() )
{
return;
}
mark_tics++;
purge_tics++;
@ -802,7 +809,6 @@ void RaftManager::timer_action(const ActionRequest& ar)
// Database housekeeping
if ( (purge_tics * timer_period_ms) >= purge_period_ms )
{
Nebula& nd = Nebula::instance();
LogDB * logdb = nd.get_logdb();
int rc = logdb->purge_log();

View File

@ -383,11 +383,15 @@ void Request::execute(
return;
}
if ( raftm->is_follower() && leader_only)
if ( (raftm->is_follower() || nd.is_cache()) && leader_only)
{
string leader_endpoint, error;
if ( raftm->get_leader_endpoint(leader_endpoint) != 0 )
if ( nd.is_cache() )
{
leader_endpoint = nd.get_master_oned();
}
else if ( raftm->get_leader_endpoint(leader_endpoint) != 0 )
{
att.resp_msg = "Cannot process request, no leader found";
failure_response(INTERNAL, att);

View File

@ -321,6 +321,15 @@ void ZoneReplicateLog::request_execute(xmlrpc_c::paramList const& paramList,
return;
}
if ( nd.is_cache() )
{
att.resp_msg = "Server is in cache mode.";
att.resp_id = 0;
failure_response(ACTION, att);
return;
}
if ( leader_term < current_term )
{
std::ostringstream oss;
@ -472,6 +481,15 @@ void ZoneVoteRequest::request_execute(xmlrpc_c::paramList const& paramList,
return;
}
if ( nd.is_cache() )
{
att.resp_msg = "Server is in cache mode.";
att.resp_id = 0;
failure_response(ACTION, att);
return;
}
if ( candidate_term < current_term )
{
att.resp_msg = "Candidate's term is outdated";
@ -562,6 +580,15 @@ void ZoneReplicateFedLog::request_execute(xmlrpc_c::paramList const& paramList,
return;
}
if ( nd.is_cache() )
{
att.resp_msg = "Server is in cache mode.";
att.resp_id = -1;
failure_response(ACTION, att);
return;
}
if ( sql.empty() )
{
oss << "Received an empty SQL command at index" << index;

View File

@ -104,8 +104,8 @@ int LogDBRecord::select_cb(void *nil, int num, char **values, char **names)
/* -------------------------------------------------------------------------- */
/* -------------------------------------------------------------------------- */
LogDB::LogDB(SqlDB * _db, bool _solo, unsigned int _lret, unsigned int _lp):
solo(_solo), db(_db), next_index(0), last_applied(-1), last_index(-1),
LogDB::LogDB(SqlDB * _db, bool _solo, bool _cache, unsigned int _lret, unsigned int _lp):
solo(_solo), cache(_cache), db(_db), next_index(0), last_applied(-1), last_index(-1),
last_term(-1), log_retention(_lret), limit_purge(_lp)
{
int r, i;
@ -494,6 +494,11 @@ int LogDB::_exec_wr(ostringstream& cmd, int federated_index)
return rc;
}
else if ( cache )
{
NebulaLog::log("DBM", Log::ERROR,"Tried to modify DB in caching mode");
return -1;
}
else if ( raftm == 0 || !raftm->is_leader() )
{
NebulaLog::log("DBM", Log::ERROR,"Tried to modify DB being a follower");