1
0
mirror of https://github.com/OpenNebula/one.git synced 2025-02-26 09:57:23 +03:00

Add multithead Information Manager to process monitor messages

(cherry picked from commit 042ba56731720c4d7973015b5cc89945bebed8d3)
This commit is contained in:
Ruben S. Montero 2013-11-17 17:41:46 +01:00
parent 7656a7b8f1
commit 9f691779cd
6 changed files with 400 additions and 189 deletions

View File

@ -22,12 +22,11 @@
#include <sstream>
#include "Mad.h"
#include "HostPool.h"
#include "DatastorePool.h"
using namespace std;
class MonitorThreadPool;
/**
* InformationManagerDriver provides a base class to implement IM
* Drivers. This class implements the protocol and recover functions
@ -41,10 +40,9 @@ public:
InformationManagerDriver(
int userid,
const map<string,string>& attrs,
bool sudo,
HostPool * pool);
bool sudo);
virtual ~InformationManagerDriver(){};
virtual ~InformationManagerDriver();
/**
* Implements the IM driver protocol.
@ -75,14 +73,9 @@ public:
void stop_monitor(int oid, const string& host) const;
private:
/**
* Pointer to the Virtual Machine Pool, to access VMs
*/
HostPool * hpool;
DatastorePool * dspool;
friend class InformationManager;
MonitorThreadPool * mtpool;
};
/* -------------------------------------------------------------------------- */

102
include/MonitorThread.h Normal file
View File

@ -0,0 +1,102 @@
/* -------------------------------------------------------------------------- */
/* Copyright 2002-2013, OpenNebula Project (OpenNebula.org), C12G Labs */
/* */
/* Licensed under the Apache License, Version 2.0 (the "License"); you may */
/* not use this file except in compliance with the License. You may obtain */
/* a copy of the License at */
/* */
/* http://www.apache.org/licenses/LICENSE-2.0 */
/* */
/* Unless required by applicable law or agreed to in writing, software */
/* distributed under the License is distributed on an "AS IS" BASIS, */
/* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. */
/* See the License for the specific language governing permissions and */
/* limitations under the License. */
/* -------------------------------------------------------------------------- */
#ifndef MONITOR_THREAD_H_
#define MONITOR_THREAD_H_
#include <string>
#include <pthread.h>
class HostPool;
class DatastorePool;
class LifeCycleManager;
class MonitorThreadPool;
extern "C" void * do_message_thread(void *arg);
class MonitorThread
{
private:
friend class MonitorThreadPool;
friend void * do_message_thread(void *arg);
MonitorThread(int hid, std::string res, std::string inf):host_id(hid),
result(res), hinfo64(inf){};
~MonitorThread(){};
void do_message();
// Message variables
int host_id;
std::string result;
std::string hinfo64;
// Pointers shared by all the MonitorThreads, init by MonitorThreadPool
static HostPool * hpool;
static DatastorePool * dspool;
static LifeCycleManager *lcm;
static MonitorThreadPool * mthpool;
};
// -----------------------------------------------------------------------------
// -----------------------------------------------------------------------------
class MonitorThreadPool
{
public:
MonitorThreadPool(int num_threads);
~MonitorThreadPool(){};
/**
* Creates a new thread to parse and process a monitor message
* @param hid host id
* @param result of the monitor operation
* @oaram hinfo the information sent by the driver
*/
void do_message(int hid, const std::string& result, const std::string& hinfo);
/**
* Terminates a running thread, and signal the main control thread.
*/
void exit_monitor_thread();
private:
int concurrent_threads; /**< Max number of concurrent threads*/
int running_threads; /**< Number of running threads*/
//Concurrency control variables
pthread_mutex_t mutex;
pthread_cond_t cond;
};
/* -------------------------------------------------------------------------- */
/* -------------------------------------------------------------------------- */
#endif /*MONITOR_THREAD_H_*/

View File

@ -71,7 +71,7 @@ int InformationManager::load_mads(int uid)
NebulaLog::log("InM",Log::INFO,oss);
im_mad = new InformationManagerDriver(0,vattr->value(),false,hpool);
im_mad = new InformationManagerDriver(0,vattr->value(),false);
rc = add(im_mad);

View File

@ -19,6 +19,8 @@
#include "Nebula.h"
#include "NebulaUtil.h"
#include "VirtualMachineManagerDriver.h"
#include "MonitorThread.h"
#include <sstream>
/* -------------------------------------------------------------------------- */
@ -27,13 +29,19 @@
InformationManagerDriver::InformationManagerDriver(
int userid,
const map<string,string>& attrs,
bool sudo,
HostPool * pool):
Mad(userid,attrs,sudo),hpool(pool)
bool sudo):
Mad(userid,attrs,sudo)
{
dspool = Nebula::instance().get_dspool();
mtpool = new MonitorThreadPool(100);
};
InformationManagerDriver::~InformationManagerDriver()
{
if (mtpool != 0)
{
delete mtpool;
}
};
/* ************************************************************************** */
/* Driver ASCII Protocol Implementation */
@ -75,13 +83,8 @@ void InformationManagerDriver::protocol(const string& message) const
ostringstream ess;
Host * host;
set<int> vm_ids;
string hinfo64;
string* hinfo;
// Parse the driver message
if ( is.good() )
@ -111,31 +114,13 @@ void InformationManagerDriver::protocol(const string& message) const
goto error_parse;
}
// -----------------------
// -------------------------------------------------------------------------
// Protocol implementation
// -----------------------
// -------------------------------------------------------------------------
if ( action == "MONITOR" )
{
bool vm_poll;
set<int> lost;
map<int,string> found;
set<int> non_shared_ds;
map<int,const VectorAttribute*> datastores;
Template tmpl;
Datastore * ds;
map<int, const VectorAttribute*>::iterator itm;
int rc;
// ---------------------------------------------------------------------
// Get information from driver and decode from base64
// ---------------------------------------------------------------------
string hinfo64;
getline (is, hinfo64);
@ -144,144 +129,7 @@ void InformationManagerDriver::protocol(const string& message) const
return;
}
host = hpool->get(id,true);
if ( host == 0 )
{
goto error_host;
}
hinfo = one_util::base64_decode(hinfo64);
// ---------------------------------------------------------------------
// Monitoring Error
// ---------------------------------------------------------------------
if (result != "SUCCESS")
{
set<int> vm_ids;
host->error_info(*hinfo, vm_ids);
Nebula &ne = Nebula::instance();
LifeCycleManager *lcm = ne.get_lcm();
for (set<int>::iterator it = vm_ids.begin(); it != vm_ids.end(); it++)
{
lcm->trigger(LifeCycleManager::MONITOR_DONE, *it);
}
delete hinfo;
hpool->update(host);
host->unlock();
return;
}
// ---------------------------------------------------------------------
// Get DS Information from Moniroting Information
// ---------------------------------------------------------------------
rc = host->extract_ds_info(*hinfo, tmpl, datastores);
delete hinfo;
host->unlock();
if (rc != 0)
{
return;
}
for (itm = datastores.begin(); itm != datastores.end(); itm++)
{
ds = dspool->get(itm->first, true);
if (ds == 0)
{
continue;
}
if (ds->get_type() == Datastore::SYSTEM_DS)
{
if (ds->is_shared())
{
float total = 0, free = 0, used = 0;
ostringstream oss;
(itm->second)->vector_value("TOTAL_MB", total);
(itm->second)->vector_value("FREE_MB", free);
(itm->second)->vector_value("USED_MB", used);
ds->update_monitor(total, free, used);
oss << "Datastore " << ds->get_name() <<
" (" << ds->get_oid() << ") successfully monitored.";
NebulaLog::log("ImM", Log::DEBUG, oss);
dspool->update(ds);
}
else
{
non_shared_ds.insert(itm->first);
}
}
ds->unlock();
}
// ---------------------------------------------------------------------
// Parse Host information
// ---------------------------------------------------------------------
host = hpool->get(id,true);
if ( host == 0 )
{
delete hinfo;
goto error_host;
}
rc = host->update_info(tmpl, vm_poll, lost, found, non_shared_ds);
hpool->update(host);
if (rc != 0)
{
host->unlock();
return;
}
hpool->update_monitoring(host);
ess << "Host " << host->get_name() << " (" << host->get_oid() << ")"
<< " successfully monitored.";
NebulaLog::log("InM", Log::DEBUG, ess);
host->unlock();
if (vm_poll)
{
set<int>::iterator its;
map<int,string>::iterator itm;
Nebula &ne = Nebula::instance();
LifeCycleManager *lcm = ne.get_lcm();
for (its = lost.begin(); its != lost.end(); its++)
{
lcm->trigger(LifeCycleManager::MONITOR_DONE, *its);
}
for (itm = found.begin(); itm != found.end(); itm++)
{
VirtualMachineManagerDriver::process_poll(itm->first, itm->second);
}
}
mtpool->do_message(id, result, hinfo64);
}
else if (action == "LOG")
{
@ -308,12 +156,6 @@ void InformationManagerDriver::protocol(const string& message) const
return;
error_host:
ess << "Could not get host " << id;
NebulaLog::log("InM",Log::ERROR,ess);
return;
error_parse:
ess << "Error while parsing driver message: " << message;
NebulaLog::log("InM",Log::ERROR,ess);

273
src/im/MonitorThread.cc Normal file
View File

@ -0,0 +1,273 @@
/* -------------------------------------------------------------------------- */
/* Copyright 2002-2013, OpenNebula Project (OpenNebula.org), C12G Labs */
/* */
/* Licensed under the Apache License, Version 2.0 (the "License"); you may */
/* not use this file except in compliance with the License. You may obtain */
/* a copy of the License at */
/* */
/* http://www.apache.org/licenses/LICENSE-2.0 */
/* */
/* Unless required by applicable law or agreed to in writing, software */
/* distributed under the License is distributed on an "AS IS" BASIS, */
/* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. */
/* See the License for the specific language governing permissions and */
/* limitations under the License. */
/* -------------------------------------------------------------------------- */
#include "MonitorThread.h"
#include <map>
#include <set>
#include "Nebula.h"
#include "NebulaUtil.h"
using namespace std;
/* -------------------------------------------------------------------------- */
/* -------------------------------------------------------------------------- */
HostPool * MonitorThread::hpool;
DatastorePool * MonitorThread::dspool;
LifeCycleManager * MonitorThread::lcm;
MonitorThreadPool * MonitorThread::mthpool;
/* -------------------------------------------------------------------------- */
/* -------------------------------------------------------------------------- */
extern "C" void * do_message_thread(void *arg)
{
MonitorThread * mt = static_cast<MonitorThread *>(arg);
mt->do_message();
MonitorThread::mthpool->exit_monitor_thread();
delete mt;
return 0;
};
/* -------------------------------------------------------------------------- */
/* -------------------------------------------------------------------------- */
void MonitorThread::do_message()
{
// -------------------------------------------------------------------------
// Decode from base64
// -------------------------------------------------------------------------
string* hinfo = one_util::base64_decode(hinfo64);
Host* host = hpool->get(host_id,true);
if ( host == 0 )
{
delete hinfo;
return;
}
// -------------------------------------------------------------------------
// Monitoring Error
// -------------------------------------------------------------------------
if (result != "SUCCESS")
{
set<int> vm_ids;
host->error_info(*hinfo, vm_ids);
for (set<int>::iterator it = vm_ids.begin(); it != vm_ids.end(); it++)
{
lcm->trigger(LifeCycleManager::MONITOR_DONE, *it);
}
delete hinfo;
hpool->update(host);
host->unlock();
return;
}
// -------------------------------------------------------------------------
// Get DS Information from Moniroting Information
// -------------------------------------------------------------------------
map<int,const VectorAttribute*> datastores;
map<int, const VectorAttribute*>::iterator itm;
Template tmpl;
Datastore * ds;
set<int> non_shared_ds;
int rc = host->extract_ds_info(*hinfo, tmpl, datastores);
delete hinfo;
host->unlock();
if (rc != 0)
{
return;
}
for (itm = datastores.begin(); itm != datastores.end(); itm++)
{
ds = dspool->get(itm->first, true);
if (ds == 0)
{
continue;
}
if (ds->get_type() == Datastore::SYSTEM_DS)
{
if (ds->is_shared())
{
float total = 0, free = 0, used = 0;
ostringstream oss;
(itm->second)->vector_value("TOTAL_MB", total);
(itm->second)->vector_value("FREE_MB", free);
(itm->second)->vector_value("USED_MB", used);
ds->update_monitor(total, free, used);
oss << "Datastore " << ds->get_name() << " (" << ds->get_oid()
<< ") successfully monitored.";
NebulaLog::log("ImM", Log::DEBUG, oss);
dspool->update(ds);
}
else
{
non_shared_ds.insert(itm->first);
}
}
ds->unlock();
}
// -------------------------------------------------------------------------
// Parse Host information
// -------------------------------------------------------------------------
bool vm_poll;
set<int> lost;
map<int,string> found;
ostringstream oss;
host = hpool->get(host_id,true);
if ( host == 0 )
{
return;
}
rc = host->update_info(tmpl, vm_poll, lost, found, non_shared_ds);
hpool->update(host);
if (rc != 0)
{
host->unlock();
return;
}
hpool->update_monitoring(host);
oss << "Host " << host->get_name() << " (" << host->get_oid() << ")"
<< " successfully monitored.";
NebulaLog::log("InM", Log::DEBUG, oss);
host->unlock();
if (vm_poll)
{
set<int>::iterator its;
map<int,string>::iterator itm;
for (its = lost.begin(); its != lost.end(); its++)
{
lcm->trigger(LifeCycleManager::MONITOR_DONE, *its);
}
for (itm = found.begin(); itm != found.end(); itm++)
{
VirtualMachineManagerDriver::process_poll(itm->first, itm->second);
}
}
};
/* -------------------------------------------------------------------------- */
/* -------------------------------------------------------------------------- */
MonitorThreadPool::MonitorThreadPool(int max_thr):concurrent_threads(max_thr),
running_threads(0)
{
//Initialize the MonitorThread constants
MonitorThread::dspool = Nebula::instance().get_dspool();
MonitorThread::hpool = Nebula::instance().get_hpool();
MonitorThread::lcm = Nebula::instance().get_lcm();
MonitorThread::mthpool= this;
//Initialize concurrency variables
pthread_mutex_init(&mutex,0);
pthread_cond_init(&cond,0);
};
/* -------------------------------------------------------------------------- */
/* -------------------------------------------------------------------------- */
void MonitorThreadPool::do_message(int hid, const string& result,
const string& hinfo)
{
pthread_attr_t attr;
pthread_t id;
pthread_mutex_lock(&mutex);
while (running_threads >= concurrent_threads)
{
pthread_cond_wait(&cond, &mutex);
}
pthread_attr_init(&attr);
pthread_attr_setdetachstate(&attr, PTHREAD_CREATE_DETACHED);
MonitorThread * mt = new MonitorThread(hid, result, hinfo);
running_threads++;
pthread_create(&id, &attr, do_message_thread, (void *)mt);
pthread_attr_destroy(&attr);
pthread_mutex_unlock(&mutex);
};
/* -------------------------------------------------------------------------- */
/* -------------------------------------------------------------------------- */
void MonitorThreadPool::exit_monitor_thread()
{
pthread_mutex_lock(&mutex);
running_threads--;
pthread_cond_signal(&cond);
pthread_mutex_unlock(&mutex);
};

View File

@ -23,7 +23,8 @@ lib_name='nebula_im'
# Sources to generate the library
source_files=[
'InformationManager.cc',
'InformationManagerDriver.cc'
'InformationManagerDriver.cc',
'MonitorThread.cc'
]
# Build library