1
0
mirror of https://github.com/OpenNebula/one.git synced 2024-12-27 03:21:29 +03:00

feature #212: Scheduler and Policies now uses the XML-RPC pools

This commit is contained in:
Carlos Martin and Ruben S. Montero 2010-05-14 16:38:12 +02:00 committed by Ruben S. Montero
parent 6b03c3ffab
commit 209882f44e
8 changed files with 266 additions and 375 deletions

30
src/log/SConstruct Normal file
View File

@ -0,0 +1,30 @@
# SConstruct for src/log
# -------------------------------------------------------------------------- #
# Copyright 2002-2010, OpenNebula Project Leads (OpenNebula.org) #
# #
# Licensed under the Apache License, Version 2.0 (the "License"); you may #
# not use this file except in compliance with the License. You may obtain #
# a copy of the License at #
# #
# http://www.apache.org/licenses/LICENSE-2.0 #
# #
# Unless required by applicable law or agreed to in writing, software #
# distributed under the License is distributed on an "AS IS" BASIS, #
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. #
# See the License for the specific language governing permissions and #
# limitations under the License. #
#--------------------------------------------------------------------------- #
Import('env')
lib_name='nebula_log'
# Sources to generate the library
source_files=[
'NebulaLog.cc',
'Log.cc'
]
# Build library
env.StaticLibrary(lib_name, source_files)

View File

@ -34,7 +34,8 @@ add_bison(main_env)
# Include dirs
main_env.Append(CPPPATH=[
cwd+'/include/',
cwd + '/include/',
cwd + '../../../include/'
])
# Library dirs
@ -91,6 +92,7 @@ main_env.ParseConfig('xml2-config --libs --cflags')
build_scripts=[
'src/xml/SConstruct',
'src/pool/SConstruct',
'src/sched/SConstruct'
]
for script in build_scripts:

View File

@ -26,22 +26,16 @@ using namespace std;
class PoolXML : public ObjectXML
{
protected:
// ------------------------------------------------------------------------
PoolXML(Client* client):ObjectXML()
public:
/**
*
*
*/
const map<int, ObjectXML*>& get_objects() const
{
this->client = client;
return objects;
};
virtual ~PoolXML()
{
flush();
};
// ------------------------------------------------------------------------
/**
* Set ups the pool by performing the following actions:
* - All the objects stored in the pool are flushed
@ -127,15 +121,22 @@ protected:
}
};
/**
*
*
*/
const map<int, ObjectXML*>& get_objects() const
protected:
// ------------------------------------------------------------------------
PoolXML(Client* client):ObjectXML()
{
return objects;
this->client = client;
};
virtual ~PoolXML()
{
flush();
};
// ------------------------------------------------------------------------
/**
* Inserts a new ObjectXML into the objects map
*/

View File

@ -22,68 +22,68 @@
using namespace std;
class RankPolicy : public SchedulerHostPolicy
class RankPolicy : public SchedulerHostPolicy
{
public:
RankPolicy(
SchedulerVirtualMachinePool * vmpool,
SchedulerHostPool * hpool,
VirtualMachinePoolXML * vmpool,
HostPoolXML * hpool,
float w=1.0):SchedulerHostPolicy(vmpool,hpool,w){};
~RankPolicy(){};
private:
void policy(
SchedulerVirtualMachine * vm)
{
VirtualMachineXML * vm)
{
string srank;
int rank;
char * errmsg;
int rc;
vector<int> hids;
unsigned int i;
SchedulerHost * host;
HostXML * host;
vm->get_matching_hosts(hids);
vm->get_template_attribute("RANK",srank);
srank = vm->get_rank();
if (srank == "")
{
Scheduler::log("RANK",Log::WARNING,"No rank defined for VM");
NebulaLog::log("RANK",Log::WARNING,"No rank defined for VM");
}
for (i=0;i<hids.size();i++)
{
rank = 0;
{
rank = 0;
if (srank != "")
{
host = hpool->get(hids[i],false);
host = hpool->get(hids[i]);
if ( host != 0 )
{
rc = host->rank(srank, rank, &errmsg);
if (rc != 0)
{
ostringstream oss;
oss << "Computing host rank, expression: " << srank
<< ", error: " << errmsg;
Scheduler::log("RANK",Log::ERROR,oss);
free(errmsg);
}
{
rc = host->eval_arith(srank, rank, &errmsg);
if (rc != 0)
{
ostringstream oss;
oss << "Computing host rank, expression: " << srank
<< ", error: " << errmsg;
NebulaLog::log("RANK",Log::ERROR,oss);
free(errmsg);
}
}
}
priority.push_back(rank);
priority.push_back(rank);
}
}
};

View File

@ -18,20 +18,13 @@
#define SCHEDULER_H_
#include "Log.h"
#include "SchedulerHost.h"
#include "SchedulerVirtualMachine.h"
#include "HostPoolXML.h"
#include "VirtualMachinePoolXML.h"
#include "SchedulerPolicy.h"
#include "ActionManager.h"
#include <sqlite3.h>
#include <sstream>
#include <xmlrpc-c/girerr.hpp>
#include <xmlrpc-c/base.hpp>
#include <xmlrpc-c/client_simple.hpp>
using namespace std;
/* -------------------------------------------------------------------------- */
/* -------------------------------------------------------------------------- */
@ -44,160 +37,114 @@ extern "C" void * scheduler_action_loop(void *arg);
class Scheduler: public ActionListener
{
public:
// ---------------------------------------------------------------
// Loggging
// ---------------------------------------------------------------
static void log(
const char * module,
const Log::MessageType type,
const ostringstream& message,
const char * filename = 0,
Log::MessageType clevel = Log::DEBUG)
{
static Log scheduler_log(filename,clevel);
static pthread_mutex_t log_mutex = PTHREAD_MUTEX_INITIALIZER;
pthread_mutex_lock(&log_mutex);
scheduler_log.log(module,type,message);
pthread_mutex_unlock(&log_mutex);
};
static void log(
const char * module,
const Log::MessageType type,
const char * message,
const char * filename = 0)
{
ostringstream os(message);
Scheduler::log(module,type,os,filename);
};
void start();
virtual void register_policies() = 0;
protected:
Scheduler(string& url, time_t _timer)
:hpool(0),vmpool(0),db(0),one_url(url),timer(_timer),threshold(0.9)
Scheduler(string& url, time_t _timer):
hpool(0),
vmpool(0),
one_url(url),
timer(_timer),
threshold(0.9),
client()
{
am.addListener(this);
};
virtual ~Scheduler()
{
{
if ( hpool != 0)
{
delete hpool;
}
if ( vmpool != 0)
{
delete vmpool;
}
if (db != 0)
{
delete db;
}
};
// ---------------------------------------------------------------
// Pools
// ---------------------------------------------------------------
SchedulerHostPool * hpool;
SchedulerVirtualMachinePool * vmpool;
HostPoolXML * hpool;
VirtualMachinePoolXML * vmpool;
// ---------------------------------------------------------------
// Scheduler Policies
// ---------------------------------------------------------------
void add_host_policy(SchedulerHostPolicy *policy)
{
host_policies.push_back(policy);
host_policies.push_back(policy);
}
// ---------------------------------------------------------------
// Scheduler main methods
// ---------------------------------------------------------------
/**
* Gets the hosts that match the requirements of the pending VMs, also
* the capacity of the host is checked. If there is enough room to host the
* VM a share vector is added to the VM.
* Gets the hosts that match the requirements of the pending VMs, also
* the capacity of the host is checked. If there is enough room to host the
* VM a share vector is added to the VM.
*/
virtual void match();
virtual void dispatch();
virtual int schedule();
virtual int set_up_pools();
private:
Scheduler(Scheduler const&){};
Scheduler& operator=(Scheduler const&){return *this;};
friend void * scheduler_action_loop(void *arg);
// ---------------------------------------------------------------
// Database
// ---------------------------------------------------------------
SqliteDB * db;
Scheduler& operator=(Scheduler const&){return *this;};
friend void * scheduler_action_loop(void *arg);
// ---------------------------------------------------------------
// Scheduling Policies
// ---------------------------------------------------------------
vector<SchedulerHostPolicy *> host_policies;
// ---------------------------------------------------------------
// Configuration attributes
// ---------------------------------------------------------------
/**
* the URL of the XML-RPC server
*/
string one_url;
time_t timer;
/**
* Threshold value to round up freecpu
*/
float threshold;
/**
* XML_RPC client
*/
Client client;
// ---------------------------------------------------------------
// Timer to periodically schedule and dispatch VMs
// ---------------------------------------------------------------
pthread_t sched_thread;
ActionManager am;
void do_action(const string &name, void *args);
// ---------------------------------------------------------------
// XML_RPC related variables
// ---------------------------------------------------------------
/**
* The authentication token
*/
string secret;
/**
* NOTE (from lib doc): "you may not have more than one object of this
* class in a program. The code is not re-entrant -- it uses global
* variables."
*/
xmlrpc_c::clientSimple xmlrpc_client;
};
#endif /*SCHEDULER_H_*/

View File

@ -17,45 +17,46 @@
#ifndef SCHEDULER_POLICY_H_
#define SCHEDULER_POLICY_H_
#include "SchedulerHost.h"
#include "SchedulerVirtualMachine.h"
#include "HostPoolXML.h"
#include "VirtualMachinePoolXML.h"
#include <cmath>
#include <algorithm>
using namespace std;
class SchedulerHostPolicy
{
public:
SchedulerHostPolicy(
SchedulerVirtualMachinePool * _vmpool,
SchedulerHostPool * _hpool,
VirtualMachinePoolXML * _vmpool,
HostPoolXML * _hpool,
float w=1.0):
vmpool(_vmpool),hpool(_hpool),sw(w){};
virtual ~SchedulerHostPolicy(){};
const vector<float>& get(
SchedulerVirtualMachine * vm)
VirtualMachineXML * vm)
{
priority.clear();
policy(vm);
if(priority.empty()!=true)
{
if(priority.empty()!=true)
{
sw.max = fabs(*max_element(
priority.begin(),
priority.end(),
SchedulerHostPolicy::abs_cmp));
transform(
priority.begin(),
priority.end(),
priority.begin(),
sw);
}
return priority;
};
@ -63,27 +64,27 @@ protected:
vector<float> priority;
virtual void policy(SchedulerVirtualMachine * vm) = 0;
SchedulerVirtualMachinePool * vmpool;
SchedulerHostPool * hpool;
virtual void policy(VirtualMachineXML * vm) = 0;
VirtualMachinePoolXML * vmpool;
HostPoolXML * hpool;
private:
static bool abs_cmp(float fl1, float fl2)
{
return fabs(fl1)<fabs(fl2);
};
//--------------------------------------------------------------------------
class ScaleWeight
//--------------------------------------------------------------------------
class ScaleWeight
{
public:
ScaleWeight(float _weight):weight(_weight){};
~ScaleWeight(){};
float operator() (float pr)
float operator() (float pr)
{
if ( max == 0 )
{
@ -97,12 +98,12 @@ private:
private:
friend class SchedulerHostPolicy;
float weight;
float max;
};
//--------------------------------------------------------------------------
ScaleWeight sw;
};

View File

@ -0,0 +1,26 @@
# SConstruct for src/pool
# -------------------------------------------------------------------------- #
# Copyright 2002-2010, OpenNebula Project Leads (OpenNebula.org) #
# #
# Licensed under the Apache License, Version 2.0 (the "License"); you may #
# not use this file except in compliance with the License. You may obtain #
# a copy of the License at #
# #
# http://www.apache.org/licenses/LICENSE-2.0 #
# #
# Unless required by applicable law or agreed to in writing, software #
# distributed under the License is distributed on an "AS IS" BASIS, #
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. #
# See the License for the specific language governing permissions and #
# limitations under the License. #
#--------------------------------------------------------------------------- #
Import('env')
lib_name='scheduler_sched'
source_files=['Scheduler.cc']
# Build library
env.StaticLibrary(lib_name, source_files)

View File

@ -31,8 +31,7 @@
#include "Scheduler.h"
#include "RankPolicy.h"
#include "Nebula.h"
#include "User.h"
#include "NebulaLog.h"
using namespace std;
@ -50,12 +49,12 @@ extern "C" void * scheduler_action_loop(void *arg)
sched = static_cast<Scheduler *>(arg);
Scheduler::log("SCHED",Log::INFO,"Scheduler loop started.");
NebulaLog::log("SCHED",Log::INFO,"Scheduler loop started.");
sched->am.loop(sched->timer,0);
Scheduler::log("SCHED",Log::INFO,"Scheduler loop stopped.");
NebulaLog::log("SCHED",Log::INFO,"Scheduler loop stopped.");
return 0;
}
@ -64,103 +63,49 @@ extern "C" void * scheduler_action_loop(void *arg)
void Scheduler::start()
{
int rc;
Nebula& nd = Nebula::instance();
int rc;
ifstream file;
pthread_attr_t pattr;
const char * one_auth;
string one_name;
string one_pass;
string one_token;
ifstream file;
// -----------------------------------------------------------
// Log system
// -----------------------------------------------------------
try
{
string log_fname;
ostringstream oss;
const char * nl = getenv("ONE_LOCATION");
log_fname = nd.get_log_location() + "sched.log";
if (nl == 0) //OpenNebula installed under root directory
{
oss << "/var/log/one/";
}
else
{
oss << nl << "/var/";
}
Scheduler::log("SCHED",
Log::INFO,
"Init Scheduler Log system",
log_fname.c_str());
oss << "sched.log";
NebulaLog::init_log_system(NebulaLog::FILE,
Log::DEBUG,
oss.str().c_str());
NebulaLog::log("SCHED", Log::INFO, "Init Scheduler Log system");
}
catch(runtime_error &)
{
throw;
}
one_auth = getenv("ONE_AUTH");
if (!one_auth)
{
struct passwd * pw_ent;
pw_ent = getpwuid(getuid());
if ((pw_ent != NULL) && (pw_ent->pw_dir != NULL))
{
string one_auth_file = pw_ent->pw_dir;
one_auth_file += "/.one/one_auth";
one_auth = one_auth_file.c_str();
}
else
{
throw runtime_error("Could not get one_auth file location");
}
}
file.open(one_auth);
if (file.good())
{
getline(file,one_token);
if (file.fail())
{
throw runtime_error("Error reading $ONE_AUTH file");
}
}
else
{
throw runtime_error("Could not open $ONE_AUTH file");
}
file.close();
if ( User::split_secret(one_token,one_name,one_pass) != 0 )
{
throw runtime_error("Wrong format must be <username>:<password>");
}
secret = one_name + ":" + User::sha1_digest(one_pass);
// -----------------------------------------------------------
// Pools
// -----------------------------------------------------------
try
{
string db_name = nd.get_var_location() + "one.db";
db = new SqliteDB(db_name,Scheduler::log);
}
catch (exception&)
{
throw;
}
hpool = new SchedulerHostPool(db);
vmpool = new SchedulerVirtualMachinePool(db);
hpool = new HostPoolXML(&client);
vmpool = new VirtualMachinePoolXML(&client);
// -----------------------------------------------------------
// Load scheduler policies
@ -185,7 +130,7 @@ void Scheduler::start()
fcntl(0,F_SETFD,0); // Keep them open across exec funcs
fcntl(1,F_SETFD,0);
fcntl(2,F_SETFD,0);
// -----------------------------------------------------------
// Block all signals before creating any thread
// -----------------------------------------------------------
@ -201,8 +146,8 @@ void Scheduler::start()
// Create the scheduler loop
// -----------------------------------------------------------
Scheduler::log("SCHED",Log::INFO,"Starting scheduler loop...");
NebulaLog::log("SCHED",Log::INFO,"Starting scheduler loop...");
pthread_attr_init (&pattr);
pthread_attr_setdetachstate (&pattr, PTHREAD_CREATE_JOINABLE);
@ -210,9 +155,9 @@ void Scheduler::start()
if ( rc != 0 )
{
Scheduler::log("SCHED",Log::ERROR,
NebulaLog::log("SCHED",Log::ERROR,
"Could not start scheduler loop, exiting");
return;
}
@ -278,39 +223,36 @@ int Scheduler::set_up_pools()
void Scheduler::match()
{
SchedulerVirtualMachine * vm;
int vm_memory;
int vm_cpu;
int vm_disk;
string reqs;
VirtualMachineXML * vm;
int vm_memory;
int vm_cpu;
int vm_disk;
string reqs;
SchedulerHost * host;
int host_memory;
int host_cpu;
char * error;
bool matched;
HostXML * host;
int host_memory;
int host_cpu;
char * error;
bool matched;
int rc;
int rc;
for (unsigned int i= 0; i < vmpool->pending_vms.size(); i++)
map<int, ObjectXML*>::const_iterator vm_it;
map<int, ObjectXML*>::const_iterator h_it;
const map<int, ObjectXML*> pending_vms = vmpool->get_objects();
const map<int, ObjectXML*> hosts = hpool->get_objects();
for (vm_it=pending_vms.begin(); vm_it != pending_vms.end(); vm_it++)
{
vm = vmpool->get(vmpool->pending_vms[i],false);
vm = static_cast<VirtualMachineXML*>(vm_it->second);
if ( vm == 0 )
reqs = vm->get_requirements();
for (h_it=hosts.begin(); h_it != hosts.end(); h_it++)
{
continue;
}
vm->get_template_attribute("REQUIREMENTS",reqs);
for (unsigned int j=0;j<hpool->hids.size();j++)
{
host = hpool->get(hpool->hids[j],false);
if ( host == 0 )
{
continue;
}
host = static_cast<HostXML *>(h_it->second);
// -----------------------------------------------------------------
// Evaluate VM requirements
@ -318,7 +260,7 @@ void Scheduler::match()
if (reqs != "")
{
rc = host->match(reqs,matched,&error);
rc = host->eval_bool(reqs,matched,&error);
if ( rc != 0 )
{
@ -328,7 +270,7 @@ void Scheduler::match()
oss << "Error evaluating expresion: " << reqs
<< ", error: " << error;
Scheduler::log("HOST",Log::ERROR,oss);
NebulaLog::log("SCHED",Log::ERROR,oss);
free(error);
}
@ -375,25 +317,20 @@ static float sum_operator (float i, float j)
int Scheduler::schedule()
{
vector<SchedulerHostPolicy *>::iterator it;
vector<int>::iterator jt;
vector<int>::iterator kt;
SchedulerVirtualMachine * vm;
ostringstream oss;
VirtualMachineXML * vm;
ostringstream oss;
vector<float> total;
vector<float> policy;
for (jt=vmpool->pending_vms.begin();jt!=vmpool->pending_vms.end();jt++)
map<int, ObjectXML*>::const_iterator vm_it;
const map<int, ObjectXML*> pending_vms = vmpool->get_objects();
for (vm_it=pending_vms.begin(); vm_it != pending_vms.end(); vm_it++)
{
vm = vmpool->get(*jt,false);
if ( vm == 0 )
{
oss << "Can not get VM id=" << *jt;
Scheduler::log("HOST",Log::ERROR,oss);
continue;
}
vm = static_cast<VirtualMachineXML*>(vm_it->second);
total.clear();
@ -427,90 +364,37 @@ int Scheduler::schedule()
void Scheduler::dispatch()
{
vector<int>::iterator it;
SchedulerVirtualMachine * vm;
ostringstream oss;
VirtualMachineXML * vm;
ostringstream oss;
int hid;
int rc;
map<int, ObjectXML*>::const_iterator vm_it;
const map<int, ObjectXML*> pending_vms = vmpool->get_objects();
oss << "Select hosts" << endl;
oss << "\tPRI\tHID" << endl;
oss << "\t-------------------" << endl;
for (it=vmpool->pending_vms.begin();it!=vmpool->pending_vms.end();it++)
for (vm_it=pending_vms.begin(); vm_it != pending_vms.end(); vm_it++)
{
vm = vmpool->get(*it,false);
if ( vm != 0 )
{
oss << "Virtual Machine: " << vm->get_oid() << "\n" << *vm << endl;
}
vm = static_cast<VirtualMachineXML*>(vm_it->second);
oss << "Virtual Machine: " << vm->get_oid() << "\n" << *vm << endl;
}
Scheduler::log("SCHED",Log::INFO,oss);
NebulaLog::log("SCHED",Log::INFO,oss);
for (it=vmpool->pending_vms.begin();it!=vmpool->pending_vms.end();it++)
for (vm_it=pending_vms.begin(); vm_it != pending_vms.end(); vm_it++)
{
vm = vmpool->get(*it,false);
if ( vm == 0 )
{
continue;
}
vm = static_cast<VirtualMachineXML*>(vm_it->second);
rc = vm->get_host(hid,hpool);
if (rc == 0)
{
xmlrpc_c::value deploy_result;
oss.str("");
oss << "Dispatching virtual machine " << vm->get_oid()
<< " to HID: " << hid;
Scheduler::log("SCHED",Log::INFO,oss);
// Tell ONE about the decision
try
{
xmlrpc_client.call(
one_url,
"one.vm.deploy",
"sii",
&deploy_result,
secret.c_str(),
vm->get_oid(),
hid);
}
catch (exception &e)
{
oss.str("");
oss << "Exception raised: " << e.what() << '\n';
Scheduler::log("SCHED",Log::ERROR,oss);
break;
}
// See how ONE handled the deployment
xmlrpc_c::value_array result(deploy_result);
vector<xmlrpc_c::value> const param_array(result.vectorValueValue());
xmlrpc_c::value_boolean const result_correct(param_array[0]);
if ( static_cast<bool>(result_correct) != true )
{
xmlrpc_c::value_string const info(param_array[1]);
oss.str("");
oss << "Error deploying virtual machine " << vm->get_oid()
<< " to HID: " << hid
<< ". Reason: " << static_cast<string>(info);
Scheduler::log("SCHED",Log::ERROR,oss);
}
//vm->dispatch(hid,client);
}
}
}
@ -542,6 +426,6 @@ void Scheduler::do_action(const string &name, void *args)
}
else if (name == ACTION_FINALIZE)
{
Scheduler::log("SCHED",Log::INFO,"Stopping the scheduler...");
NebulaLog::log("SCHED",Log::INFO,"Stopping the scheduler...");
}
}