1
0
mirror of https://github.com/OpenNebula/one.git synced 2024-12-21 09:33:53 +03:00

F #6275: External scheduler API

The connection to an external scheduler module is configured in sched.conf:

EXTERNAL_SCHEDULER = [
   SERVER  = "http://localhost:4567",
   PROXY   = "",
   TIMEOUT = 10
]

The API post on '/' the list of VMs, their pre-selected list of
candidate hosts based on REQUIREMENTS along with the VM information
(CAPACITY, TEMPLATE and USER_TEMPLATE).

Example:
{
  "VMS": [
    {
      "CAPACITY": {
        "CPU": 1.5,
        "DISK_SIZE": 1024,
        "MEMORY": 131072
      },
      "HOST_IDS": [
        3,
        4,
        5
      ],
      "ID": 32,
      "STATE": "PENDING",
      "TEMPLATE": {
        "AUTOMATIC_DS_REQUIREMENTS": "(\"CLUSTERS/ID\" @> 0)",
        "AUTOMATIC_NIC_REQUIREMENTS": "(\"CLUSTERS/ID\" @> 0)",
        "AUTOMATIC_REQUIREMENTS": "(CLUSTER_ID = 0) & !(PUBLIC_CLOUD = YES) & !(PIN_POLICY = PINNED)",
        "CPU": "1.5",
        "MEMORY": "128",
        ...
      },
      "USER_TEMPLATE": {}
    },
    {
      "CAPACITY": {
        "CPU": 1.5,
        "DISK_SIZE": 1024,
        "MEMORY": 131072
      },
      "HOST_IDS": [
        3,
        4,
        5
      ],
      "ID": 33,
      "STATE": "PENDING",
      "TEMPLATE": {
       ...
      },
      "USER_TEMPLATE": {}
    }
  ]
}

The scheduler needs to respond to this post action with a simple list of
the allocation for each VM:

{
  "VMS": [
    {
      "ID": 32,
      "HOST_ID": 2
    },
    {
      "ID": 33,
      "HOST_ID": 0
    }
  ]
}

This commits vendorize Vendorize nlohmann-json (MIT license)
This commit is contained in:
Ruben S. Montero 2023-08-29 17:11:01 +02:00
parent ee8c6bd8be
commit dca50b2bb9
No known key found for this signature in database
GPG Key ID: A0CEA6FA880A1D87
10 changed files with 23385 additions and 19 deletions

124
include/HttpRequest.h Normal file
View File

@ -0,0 +1,124 @@
/* -------------------------------------------------------------------------- */
/* Copyright 2002-2023, OpenNebula Project, OpenNebula Systems */
/* */
/* Licensed under the Apache License, Version 2.0 (the "License"); you may */
/* not use this file except in compliance with the License. You may obtain */
/* a copy of the License at */
/* */
/* http://www.apache.org/licenses/LICENSE-2.0 */
/* */
/* Unless required by applicable law or agreed to in writing, software */
/* distributed under the License is distributed on an "AS IS" BASIS, */
/* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. */
/* See the License for the specific language governing permissions and */
/* limitations under the License. */
/* -------------------------------------------------------------------------- */
#include <curl/curl.h>
#include <string>
/* Class for sedning Http requests and receiving responses.
* Note: Very simple implementation:
* - no consistency check
* - no checks if the response is in correct format
* - almost no error handling
*/
class HttpRequest
{
public:
HttpRequest();
~HttpRequest();
/** Send a POST request, receive response as JSON
* @param url Address
* @param data Data to send in json format
* @param response In case of success full response from server in json format,
* the caller should extract the data from { json : {...} }.
* Contains error string in case of failure.
* @return 0 on success, -1 otherwise
*/
int post_json(const std::string& url, const std::string& data, std::string& response);
int post_json(const std::string& data, std::string& response)
{
return post_json(_url, data, response);
}
/** Send a GET request, receive response as JSON
* @param url Address
* @param response In case of success full response from server in json format,
* the caller should extract the data from { json : {...} }.
* Contains error string in case of failure.
* @return 0 on success, -1 otherwise
*/
int get_json(const std::string& url, std::string& response);
int get_json(std::string& response)
{
return get_json(_url, response);
}
bool is_initialized() const
{
return !_url.empty();
}
/**
* Set server url adress, in the form "scheme://host:port/path "
*/
void set_server(const std::string& url)
{
_url = url;
}
/**
* Set maximum time in seconds that transfer operation can take.
* 0 -> Use curl default value
* See curl CURLOPT_TIMEOUT for more info
*/
void set_timeout(long timeout)
{
_timeout = timeout;
}
/**
* Set proxy server, including protocol and port. Example "http://example.com:1234"
* See curl CURLOPT_PROXY for more info
*/
void set_proxy(const std::string& proxy)
{
_proxy = proxy;
}
private:
/**
* Callback method for writing response of curl operation to string
* See curl CURLOPT_WRITEFUNCTION for more info
*/
static size_t write_to_string(void *ptr, size_t size, size_t count, void *str);
/**
* Check curl response, eventually set error message
* @param curl Pointer to curl handle
* @param error Contains error message if any
* @return 0 on success, -1 otherwise
*/
static int check_http_code(CURL* curl, std::string& msg);
/**
* Server url adress
*/
std::string _url;
/**
* Maximum time in seconds that transfer operation can take.
* 0 -> Use curl default value
*/
long _timeout = 0;
/**
* Curl proxy server, including the protocol and port.
*/
std::string _proxy;
};

22875
share/vendor/nlohmann/json.hpp vendored Normal file

File diff suppressed because it is too large Load Diff

152
src/common/HttpRequest.cc Normal file
View File

@ -0,0 +1,152 @@
/* -------------------------------------------------------------------------- */
/* Copyright 2002-2023, OpenNebula Project, OpenNebula Systems */
/* */
/* Licensed under the Apache License, Version 2.0 (the "License"); you may */
/* not use this file except in compliance with the License. You may obtain */
/* a copy of the License at */
/* */
/* http://www.apache.org/licenses/LICENSE-2.0 */
/* */
/* Unless required by applicable law or agreed to in writing, software */
/* distributed under the License is distributed on an "AS IS" BASIS, */
/* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. */
/* See the License for the specific language governing permissions and */
/* limitations under the License. */
/* -------------------------------------------------------------------------- */
#include "HttpRequest.h"
#include <iostream>
using namespace std;
HttpRequest::HttpRequest()
{
curl_global_init(CURL_GLOBAL_ALL);
}
/* -------------------------------------------------------------------------- */
/* -------------------------------------------------------------------------- */
HttpRequest::~HttpRequest()
{
curl_global_cleanup();
}
/* -------------------------------------------------------------------------- */
/* -------------------------------------------------------------------------- */
int HttpRequest::post_json(const std::string& url, const std::string& data, std::string& response)
{
auto curl = curl_easy_init();
if (!curl) return -1;
auto headers = curl_slist_append(nullptr, "Accept: application/json");
headers = curl_slist_append(headers, "Content-Type: application/json");
curl_easy_setopt(curl, CURLOPT_URL, url.c_str());
curl_easy_setopt(curl, CURLOPT_HTTPHEADER, headers);
curl_easy_setopt(curl, CURLOPT_POSTFIELDS, data.c_str());
curl_easy_setopt(curl, CURLOPT_WRITEFUNCTION, write_to_string);
curl_easy_setopt(curl, CURLOPT_WRITEDATA, &response);
if (_timeout != 0)
{
curl_easy_setopt(curl, CURLOPT_TIMEOUT, _timeout);
}
if (!_proxy.empty())
{
curl_easy_setopt(curl, CURLOPT_PROXY, _proxy);
}
auto ec = curl_easy_perform(curl);
if (ec != CURLE_OK)
{
response = curl_easy_strerror(ec);
curl_easy_cleanup(curl);
return -1;
}
auto rc = check_http_code(curl, response);
curl_easy_cleanup(curl);
return rc;
}
/* -------------------------------------------------------------------------- */
/* -------------------------------------------------------------------------- */
int HttpRequest::get_json(const std::string& url, std::string& response)
{
auto curl = curl_easy_init(); // todo use RAII to automatically clean up
if (!curl) return -1;
auto headers = curl_slist_append(nullptr, "Accept: application/json");
curl_easy_setopt(curl, CURLOPT_URL, url.c_str());
curl_easy_setopt(curl, CURLOPT_HTTPHEADER, headers);
curl_easy_setopt(curl, CURLOPT_NOPROGRESS, 1L);
curl_easy_setopt(curl, CURLOPT_WRITEFUNCTION, write_to_string);
curl_easy_setopt(curl, CURLOPT_WRITEDATA, &response);
if (_timeout != 0)
{
curl_easy_setopt(curl, CURLOPT_TIMEOUT, _timeout);
}
if (!_proxy.empty())
{
curl_easy_setopt(curl, CURLOPT_PROXY, _proxy);
}
auto ec = curl_easy_perform(curl);
if (ec != CURLE_OK)
{
response = curl_easy_strerror(ec);
curl_easy_cleanup(curl);
return -1;
}
auto rc = check_http_code(curl, response);
curl_easy_cleanup(curl);
return rc;
}
/* -------------------------------------------------------------------------- */
/* -------------------------------------------------------------------------- */
size_t HttpRequest::write_to_string(void *ptr, size_t size, size_t count, void *str)
{
((string*)str)->assign((char*)ptr, 0, size*count);
return size*count;
}
/* -------------------------------------------------------------------------- */
/* -------------------------------------------------------------------------- */
int HttpRequest::check_http_code(CURL* curl, std::string& msg)
{
long http_code = 0;
curl_easy_getinfo(curl, CURLINFO_RESPONSE_CODE, &http_code);
if (http_code != 200)
{
msg = "Http code " + to_string(http_code) + ": " + msg;
return -1;
}
return 0;
}

View File

@ -24,6 +24,7 @@ lib_name='nebula_common'
source_files=[
'Attribute.cc',
'ExtendedAttribute.cc',
'HttpRequest.cc',
'NebulaService.cc',
'NebulaUtil.cc',
'SSLUtil.cc'

View File

@ -19,10 +19,12 @@ import os
# This is the absolute path where the project is located
cwd=os.getcwd()
workspace_dir=GetLaunchDir()
# Include dirs
env.Append(CPPPATH=[
cwd + '/include/',
workspace_dir + '/share/vendor/',
])
# Library dirs

View File

@ -96,6 +96,8 @@
#
# MAX_BACKUPS_HOST: Maximum number of active backup operations per host.
#
# ORCHESTRATOR_URL: External scheduler, which manages deployment of VMs to hosts
#
#*******************************************************************************
MESSAGE_SIZE = 1073741824
@ -137,6 +139,12 @@ DEFAULT_NIC_SCHED = [
# RANK = "- (RUNNING_VMS * 50 + FREE_CPU)"
#]
# EXTERNAL_SCHEDULER = [
# SERVER = "http://localhost:4567",
# PROXY = "",
# TIMEOUT = 10
# ]
LOG = [
SYSTEM = "file",
DEBUG_LEVEL = 3

View File

@ -17,7 +17,6 @@
#ifndef SCHEDULER_H_
#define SCHEDULER_H_
#include "Log.h"
#include "HostPoolXML.h"
#include "VMGroupPoolXML.h"
#include "UserPoolXML.h"
@ -29,6 +28,7 @@
#include "Listener.h"
#include "AclXML.h"
#include "MonitorXML.h"
#include "HttpRequest.h"
/* -------------------------------------------------------------------------- */
/* -------------------------------------------------------------------------- */
@ -174,6 +174,8 @@ protected:
virtual void do_vm_groups();
void external_scheduler();
private:
Scheduler(Scheduler const&) = delete;
@ -241,6 +243,11 @@ private:
*/
Template oned_conf;
/**
* External Scheduler - sends and receives HTTP requests
*/
HttpRequest ext_scheduler;
// ---------------------------------------------------------------
// Timer to periodically schedule and dispatch VMs
// ---------------------------------------------------------------

View File

@ -20,6 +20,8 @@
#include <sstream>
#include <nlohmann/json.hpp>
#include "ObjectXML.h"
#include "HostPoolXML.h"
#include "Resource.h"
@ -129,6 +131,8 @@ public:
//--------------------------------------------------------------------------
int get_state() const { return state; };
std::string get_state_str() const;
int get_lcm_state() const { return lcm_state; };
int get_oid() const { return oid; };
@ -155,6 +159,8 @@ public:
void set_only_public_cloud() { only_public_cloud = true; }
void to_json(nlohmann::json &vm_json);
//--------------------------------------------------------------------------
// Scheduling requirements and rank
//--------------------------------------------------------------------------
@ -404,6 +410,11 @@ public:
return vm_template.get();
}
VirtualMachineTemplate * get_user_template()
{
return user_template.get();
}
/**
* Sets an attribute in the VM Template, it must be allocated in the heap
*

View File

@ -22,9 +22,12 @@
#include "NebulaUtil.h"
#include "History.h"
#include "RankScheduler.h"
#include "VirtualMachine.h"
using namespace std;
using json = nlohmann::json;
/******************************************************************************/
/******************************************************************************/
/* INITIALIZE VM object attributes from its XML representation */
@ -218,7 +221,7 @@ void VirtualMachineXML::init_attributes()
if (get_nodes("/VM/USER_TEMPLATE", nodes) > 0)
{
user_template = make_unique<VirtualMachineTemplate>();
user_template = make_unique<VirtualMachineTemplate>(false,'=',"USER_TEMPLATE");
user_template->from_xml_node(nodes[0]);
@ -350,6 +353,8 @@ void VirtualMachineXML::init_storage_usage()
}
}
/* -------------------------------------------------------------------------- */
/******************************************************************************/
/******************************************************************************/
/* VM requirements and capacity interface */
@ -675,6 +680,92 @@ int VirtualMachineXML::parse_action_name(string& action_st)
return 0;
};
// We have to duplicate method from VirtualMachine.h, otherwise we get into linking hell
static string state_to_str(VirtualMachine::VmState state)
{
string st;
switch (state)
{
case VirtualMachine::INIT:
st = "INIT"; break;
case VirtualMachine::PENDING:
st = "PENDING"; break;
case VirtualMachine::HOLD:
st = "HOLD"; break;
case VirtualMachine::ACTIVE:
st = "ACTIVE"; break;
case VirtualMachine::STOPPED:
st = "STOPPED"; break;
case VirtualMachine::SUSPENDED:
st = "SUSPENDED"; break;
case VirtualMachine::DONE:
st = "DONE"; break;
case VirtualMachine::POWEROFF:
st = "POWEROFF"; break;
case VirtualMachine::UNDEPLOYED:
st = "UNDEPLOYED"; break;
case VirtualMachine::CLONING:
st = "CLONING"; break;
case VirtualMachine::CLONING_FAILURE:
st = "CLONING_FAILURE"; break;
}
return st;
}
void VirtualMachineXML::to_json(json &vm_json)
{
vm_json["ID"] = oid;
vm_json["STATE"] = state_to_str(static_cast<VirtualMachine::VmState>(state));
// -------------------------------------------------------------------------
// Add matching Hosts
// -------------------------------------------------------------------------
const vector<Resource *>& hosts = match_hosts.get_resources();
json hosts_json = json::array();
for (const auto& h : hosts)
{
hosts_json += h->oid;
}
vm_json["HOST_IDS"] = hosts_json;
// -------------------------------------------------------------------------
// Add Template and UserTemplate
// -------------------------------------------------------------------------
string templ_str, user_templ_str;
vm_template->to_json(templ_str);
templ_str = "{" + templ_str + "}";
auto templ_json = json::parse(templ_str);
vm_json["TEMPLATE"] = templ_json["TEMPLATE"];
user_template->to_json(user_templ_str);
user_templ_str = "{" + user_templ_str + "}";
auto user_templ_json = json::parse(user_templ_str);
vm_json["USER_TEMPLATE"] = user_templ_json["USER_TEMPLATE"];
// -------------------------------------------------------------------------
// Add requirements
// -------------------------------------------------------------------------
json req;
req["CPU"] = cpu;
req["MEMORY"] = memory * 1024;
req["DISK_SIZE"] = system_ds_usage;
vm_json["CAPACITY"] = req;
}
//******************************************************************************
// Updates to oned
//******************************************************************************

View File

@ -14,30 +14,23 @@
/* limitations under the License. */
/* -------------------------------------------------------------------------- */
#include <stdexcept>
#include <stdlib.h>
#include <signal.h>
#include <unistd.h>
#include <fcntl.h>
#include <sys/types.h>
#include <pwd.h>
#include <pthread.h>
#include <cmath>
#include <iomanip>
#include "Scheduler.h"
#include "SchedulerTemplate.h"
#include "RankPolicy.h"
#include "NebulaLog.h"
#include "PoolObjectAuth.h"
#include "NebulaUtil.h"
#include "VirtualMachine.h"
#include <nlohmann/json.hpp>
#include <stdexcept>
#include <stdlib.h>
#include <signal.h>
#include <unistd.h>
#include <fcntl.h>
#include <sys/types.h>
#include <iomanip>
using namespace std;
using json = nlohmann::json;
/* -------------------------------------------------------------------------- */
/* -------------------------------------------------------------------------- */
@ -189,6 +182,37 @@ void Scheduler::start()
throw;
}
// Setup External Scheduler
{
VectorAttribute *ext_sched_va;
ext_sched_va = conf.get("EXTERNAL_SCHEDULER");
if (ext_sched_va)
{
string url, proxy;
long timeout;
if (ext_sched_va->vector_value("SERVER", url) == 0)
{
ext_scheduler.set_server(url);
}
if (ext_sched_va->vector_value("PROXY", proxy) == 0)
{
ext_scheduler.set_proxy(proxy);
}
if (ext_sched_va->vector_value("TIMEOUT", timeout) == 0)
{
ext_scheduler.set_timeout(timeout);
}
NebulaLog::info("SCHED", "External Scheduler configured (server = '" + url +
"', timeout = " + to_string(timeout) +
", proxy = '" + proxy + "').");
}
}
oss.str("");
oss << "Starting Scheduler Daemon" << endl;
@ -1720,6 +1744,71 @@ void Scheduler::do_vm_groups()
/* -------------------------------------------------------------------------- */
/* -------------------------------------------------------------------------- */
void Scheduler::external_scheduler()
{
if (!ext_scheduler.is_initialized())
{
return;
}
try {
// Serialize pending VMs to JSON
const map<int, ObjectXML*>& pending_vms = vmpool->get_objects();
json pending_json;
for (auto vm_it : pending_vms)
{
auto vm = static_cast<VirtualMachineXML*>(vm_it.second);
json vm_json;
vm->to_json(vm_json);
pending_json["VMS"] += vm_json;
}
// Call Http Request
string response;
if (ext_scheduler.post_json(pending_json.dump(), response) != 0)
{
NebulaLog::error("SCH", "Error connecting to External Scheduler: " + response);
return;
}
// Parse the result, update VM matched hosts by values from Orchestrator
auto resp = json::parse(response);
auto vms_json = resp["VMS"];
NebulaLog::info("SCHED", "External Scheduler: Received scheduling for "
+ to_string(vms_json.size()) + " VMs");
for (auto vm_json : vms_json)
{
// Note: We use only valid answers for VMs, ignoring others
auto vm = static_cast<VirtualMachineXML*>(pending_vms.at(vm_json["ID"]));
int host_id = vm_json["HOST_ID"];
auto host = hpool->get(host_id);
if (vm && host)
{
vm->clear_match_hosts();
vm->add_match_host(host_id);
}
}
}
catch(const exception &e)
{
NebulaLog::error("SCHED", e.what());
}
}
/* -------------------------------------------------------------------------- */
/* -------------------------------------------------------------------------- */
void Scheduler::timer_action()
{
int rc;
@ -1789,7 +1878,13 @@ void Scheduler::timer_action()
do_vm_groups();
profile(false,"Setting VM groups placement constraints.");
profile(true);
match_schedule();
profile(false,"Match scheduled resources, sort by priorities.");
profile(true);
external_scheduler();
profile(false,"Call external Scheduler.");
profile(true);
dispatch();