1
0
mirror of https://github.com/OpenNebula/one.git synced 2025-01-07 17:17:41 +03:00

F #3859: Message timestamp

* Discard out-of-order messages
  * New message format for monitor messages (include timestamp)
  * Consistent logging for debug

co-authored-by: Pavel Czerny <pczerny@opennebula.systems>
This commit is contained in:
Ruben S. Montero 2020-06-05 12:03:15 +02:00
parent 51edd58c21
commit e1086b9838
No known key found for this signature in database
GPG Key ID: A0CEA6FA880A1D87
8 changed files with 214 additions and 29 deletions

View File

@ -56,12 +56,14 @@ class InformationManagerDriver < OpenNebulaDriver
@hypervisor = hypervisor
@stdout_mutex = Mutex.new
# register actions
register_action(:START_MONITOR, method('start_monitor'))
register_action(:STOP_MONITOR, method('stop_monitor'))
end
def start_monitor(_not_used, _hostid, zaction64)
def start_monitor(_not_used, _hostid, _timestamp, zaction64)
rc, input = parse_input(:START_MONITOR, zaction64)
return if rc == -1
@ -73,29 +75,31 @@ class InformationManagerDriver < OpenNebulaDriver
return if rc == -1
end
do_action(input[:im_mad],
result, info = do_action(input[:im_mad],
input[:host_id],
input[:hostname],
:START_MONITOR,
:stdin => input[:stdin],
:stdin => input[:stdin],
:script_name => 'run_monitord_client',
:zip => true,
:base64 => true)
:respond => false)
write_respond(:START_MONITOR, result, input[:host_id], info)
end
def stop_monitor(_not_used, _hostid, zaction64)
def stop_monitor(_not_used, _hostid, _timestamp, zaction64)
rc, input = parse_input(:STOP_MONITOR, zaction64)
return if rc == -1
do_action(input[:im_mad],
result, info = do_action(input[:im_mad],
input[:host_id],
input[:hostname],
:STOP_MONITOR,
:script_name => 'stop_monitord_client',
:stdin => input[:stdin],
:zip => true,
:base64 => true)
:stdin => input[:stdin],
:respond => false)
write_respond(:STOP_MONITOR, result, input[:host_id], info)
end
private
@ -212,6 +216,15 @@ class InformationManagerDriver < OpenNebulaDriver
end
end
def write_respond(action="-", result=RESULT[:failure], id="-", info="-")
info = Zlib::Deflate.deflate(info, Zlib::BEST_COMPRESSION)
info = Base64.strict_encode64(info)
@stdout_mutex.synchronize {
STDOUT.puts "#{action} #{result} #{id} #{Time.now.to_i} #{info}"
STDOUT.flush
}
end
end
# Information Manager main program

View File

@ -104,7 +104,9 @@ class ProbeRunner
return rc, ret if rc == -1
end
ret+='</MONITOR_MESSAGES>'
ret += "<TIMESTAMP>#{Time.now.to_i}</TIMESTAMP>"
ret += '</MONITOR_MESSAGES>'
[rc, ret]
end

View File

@ -35,14 +35,14 @@ class MonitorClient
MESSAGE_TYPES.each do |mt|
define_method("#{mt}_udp".downcase.to_sym) do |rc, payload|
msg = "#{mt} #{MESSAGE_STATUS[rc]} #{@hostid} #{pack(payload)}"
msg = "#{mt} #{MESSAGE_STATUS[rc]} #{@hostid} #{Time.now.to_i} #{pack(payload)}"
@socket_udp.send(msg, 0, @host, @port)
end
end
MESSAGE_TYPES.each do |mt|
define_method("#{mt}_tcp".downcase.to_sym) do |rc, payload|
msg = "#{mt} #{MESSAGE_STATUS[rc]} #{@hostid} #{pack(payload)}"
msg = "#{mt} #{MESSAGE_STATUS[rc]} #{@hostid} #{Time.now.to_i} #{pack(payload)}"
socket_tcp = TCPSocket.new(@host, @port)
socket_tcp.send(msg, 0)

View File

@ -84,13 +84,23 @@ public:
};
time_t last_monitored() const { return _last_monitored; }
void last_monitored(time_t lm) { _last_monitored = lm; }
bool monitor_in_progress() const { return _monitor_in_progress; }
void monitor_in_progress(bool mip) { _monitor_in_progress = mip; }
time_t last_state_vm() const { return _last_state_vm; }
void last_state_vm(time_t lsv) { _last_state_vm = lsv; }
time_t last_monitor_vm() const { return _last_monitor_vm; }
void last_monitor_vm(time_t lmv) { _last_monitor_vm = lmv; }
time_t last_monitor_host() const { return _last_monitor_host; }
void last_monitor_host(time_t lmh) { _last_monitor_host = lmh; }
time_t last_system_host() const { return _last_system_host; }
void last_system_host(time_t lsh) { _last_system_host = lsh; }
/**
* Prints the Host information to an output stream. This function is used
* for logging purposes.
@ -111,6 +121,12 @@ private:
time_t _last_monitored = 0;
bool _monitor_in_progress = false;
time_t _last_state_vm = 0;
time_t _last_monitor_vm = 0;
time_t _last_monitor_host = 0;
time_t _last_system_host = 0;
};
#endif // HOST_BASE_H_

View File

@ -107,6 +107,17 @@ public:
*/
void delete_host(int oid);
/**
* Check if the message is valid based in the TIMESTAMP attribute
* @param type of the message
* @param oid of the host
* @param ts message timestamp
*
* @return true if message timestamp is greater than last timestamp or
* no timestamp is found in payload
*/
bool test_set_timestamp(MonitorDriverMessages type, int oid, time_t ts) const;
/**
* Sets the monitor information of the host. It notifies oned if needed.
* @param oid host id

View File

@ -48,15 +48,16 @@ int rsa_private_decrypt(const std::string& in, std::string& out);
* This class represents a generic message used by the Monitoring Protocol.
* The structure of the message is:
*
* +------+-----+--------+-----+-----+---------+------+
* | TYPE | ' ' | STATUS | ' ' | OID | PAYLOAD | '\n' |
* +------+-----+--------+-----+-----+---------+------+
* +------+-----+--------+-----+-----+-----+----+-----+---------+------+
* | TYPE | ' ' | STATUS | ' ' | OID | ' ' | TS | ' ' | PAYLOAD | '\n' |
* +------+-----+--------+-----+-----+-----+----+-----+---------+------+
*
* TYPE String (non-blanks) identifying the message type
* ' ' A single white space to separate fields
* STATUS String (non-blanks), status of the message depends on message
* type, could contain result of operation ("SUCCESS" or "FAILURE")
* OID Number, id of affected object, -1 if not object related
* TS timestamp for the message in epoch.
* PAYLOAD of the message XML base64 encoded
* '\n' End of message delimiter
*
@ -154,6 +155,19 @@ public:
_payload = p;
}
/**
* Message timestamp, optional
*/
time_t timestamp() const
{
return _timestamp;
}
void timestamp(time_t ts)
{
_timestamp = ts;
}
private:
/**
* Message fields
@ -166,6 +180,8 @@ private:
std::string _payload;
time_t _timestamp = 0;
static const EString<E> _type_str;
};
@ -201,6 +217,8 @@ int Message<E>::parse_from(const std::string& input, bool decrypt)
is >> _oid;
is >> _timestamp;
is >> buffer;
if (buffer.empty())
@ -275,6 +293,8 @@ int Message<E>::write_to(std::string& out, bool encrypt) const
out += ' ';
out += std::to_string(_oid);
out += ' ';
out += std::to_string(_timestamp);
out += ' ';
out += payloaz64;
out += '\n';

View File

@ -251,6 +251,7 @@ void HostMonitorManager::monitor_host(int oid, const Template &tmpl)
return;
}
HostMonitoringTemplate monitoring;
monitoring.oid(oid);
@ -570,3 +571,77 @@ void HostMonitorManager::error_monitor(int oid, const string& msg)
oned_driver->host_state(oid, oss.str());
}
/* -------------------------------------------------------------------------- */
/* -------------------------------------------------------------------------- */
bool HostMonitorManager::test_set_timestamp(MonitorDriverMessages type, int oid,
time_t ts) const
{
time_t last_ts;
if (ts == 0)
{
return true;
}
auto host = hpool->get(oid);
if (!host.valid())
{
NebulaLog::warn("HMM", "message ignored for unknown host "
+ to_string(oid));
return false;
}
switch(type)
{
case MonitorDriverMessages::MONITOR_VM:
last_ts = host->last_monitor_vm();
break;
case MonitorDriverMessages::SYSTEM_HOST:
last_ts = host->last_system_host();
break;
case MonitorDriverMessages::STATE_VM:
last_ts = host->last_state_vm();
break;
case MonitorDriverMessages::MONITOR_HOST:
last_ts = host->last_monitor_host();
break;
default:
return true;
}
if ( last_ts > ts )
{
NebulaLog::warn("HMM", "out of order message ignored for host "
+ to_string(oid));
return false;
}
switch(type)
{
case MonitorDriverMessages::MONITOR_VM:
host->last_monitor_vm(ts);
break;
case MonitorDriverMessages::SYSTEM_HOST:
host->last_system_host(ts);
break;
case MonitorDriverMessages::STATE_VM:
host->last_state_vm(ts);
break;
case MonitorDriverMessages::MONITOR_HOST:
host->last_monitor_host(ts);
break;
default:
break;
}
return true;
}

View File

@ -22,6 +22,8 @@
#include "MonitorDriverMessages.h"
#include <iomanip>
HostMonitorManager * MonitorDriverProtocol::hm = nullptr;
/* -------------------------------------------------------------------------- */
@ -32,12 +34,33 @@ void MonitorDriverProtocol::_undefined(message_t msg)
NebulaLog::info("MDP", "Received UNDEFINED msg: " + msg->payload());
}
static void log_message(const std::unique_ptr<Message<MonitorDriverMessages>>& msg)
{
if ( NebulaLog::log_level() < Log::DDEBUG )
{
return;
}
ostringstream oss;
struct tm tms;
time_t ts = msg->timestamp();
localtime_r(&ts, &tms);
oss << "[" << tms.tm_hour << ":" << tms.tm_min << ":" << tms.tm_sec
<< "] Recieved " << msg->type_str() << " message from host "
<< msg->oid() << ":\n" << msg->payload();
NebulaLog::ddebug("MDP", oss.str());
}
/* -------------------------------------------------------------------------- */
/* -------------------------------------------------------------------------- */
void MonitorDriverProtocol::_monitor_vm(message_t msg)
{
NebulaLog::ddebug("MDP", "Received MONITOR_VM msg: " + msg->payload());
log_message(msg);
if (msg->status() != "SUCCESS")
{
@ -48,6 +71,12 @@ void MonitorDriverProtocol::_monitor_vm(message_t msg)
return;
}
if (!hm->test_set_timestamp(MonitorDriverMessages::MONITOR_VM, msg->oid(),
msg->timestamp()))
{
return;
}
Template tmpl;
char * error_msg;
@ -147,8 +176,7 @@ void MonitorDriverProtocol::_monitor_vm(message_t msg)
void MonitorDriverProtocol::_beacon_host(message_t msg)
{
NebulaLog::ddebug("MDP", "Received beacon for host " +
to_string(msg->oid()) + ": " + msg->payload());
log_message(msg);
if (msg->status() != "SUCCESS")
{
@ -170,8 +198,7 @@ void MonitorDriverProtocol::_beacon_host(message_t msg)
void MonitorDriverProtocol::_monitor_host(message_t msg)
{
NebulaLog::ddebug("MDP", "Received monitoring information for host " +
to_string(msg->oid()) + ": " + msg->payload());
log_message(msg);
if (msg->status() != "SUCCESS")
{
@ -182,6 +209,12 @@ void MonitorDriverProtocol::_monitor_host(message_t msg)
return;
}
if (!hm->test_set_timestamp(MonitorDriverMessages::MONITOR_HOST, msg->oid(),
msg->timestamp()))
{
return;
}
Template tmpl;
char* error_msg;
@ -210,8 +243,7 @@ void MonitorDriverProtocol::_monitor_host(message_t msg)
void MonitorDriverProtocol::_system_host(message_t msg)
{
NebulaLog::ddebug("MDP", "Received system information for host " +
to_string(msg->oid()) + ": " + msg->payload());
log_message(msg);
if (msg->status() != "SUCCESS")
{
@ -222,6 +254,12 @@ void MonitorDriverProtocol::_system_host(message_t msg)
return;
}
if (!hm->test_set_timestamp(MonitorDriverMessages::SYSTEM_HOST, msg->oid(),
msg->timestamp()))
{
return;
}
auto oned = hm->get_oned_driver();
oned->host_system_info(msg->oid(), msg->status(), msg->payload());
}
@ -231,8 +269,7 @@ void MonitorDriverProtocol::_system_host(message_t msg)
void MonitorDriverProtocol::_state_vm(message_t msg)
{
NebulaLog::ddebug("MDP", "Received state vm message for host " +
to_string(msg->oid()) + ": " + msg->payload());
log_message(msg);
if (msg->status() != "SUCCESS")
{
@ -243,6 +280,12 @@ void MonitorDriverProtocol::_state_vm(message_t msg)
return;
}
if (!hm->test_set_timestamp(MonitorDriverMessages::STATE_VM, msg->oid(),
msg->timestamp()))
{
return;
}
auto oned = hm->get_oned_driver();
oned->vm_state(msg->oid(), msg->payload());
}
@ -262,8 +305,7 @@ void MonitorDriverProtocol::_state_vm(message_t msg)
*/
void MonitorDriverProtocol::_start_monitor(message_t msg)
{
NebulaLog::ddebug("MDP", "Received start monitor for host " +
to_string(msg->oid()) + ": " + msg->payload());
log_message(msg);
if (msg->status() != "SUCCESS")
{
@ -294,6 +336,10 @@ void MonitorDriverProtocol::_start_monitor(message_t msg)
return;
}
time_t ts;
msg_xml.xpath(ts, "/MONITOR_MESSAGES/TIMESTAMP", static_cast<time_t>(0));
const std::vector<MonitorDriverMessages> stypes = {
MonitorDriverMessages::MONITOR_VM,
MonitorDriverMessages::BEACON_HOST,
@ -322,6 +368,8 @@ void MonitorDriverProtocol::_start_monitor(message_t msg)
m->oid(msg->oid());
m->timestamp(ts);
m->status("SUCCESS");
m->payload(payload);