1
0
mirror of https://github.com/OpenNebula/one.git synced 2025-02-02 09:47:00 +03:00

M #-: subscribe to service changes

This commit is contained in:
Alejandro Huertas 2020-05-18 17:18:00 +02:00
parent 7e43c4632a
commit a5778c8e28
No known key found for this signature in database
GPG Key ID: 3044AF06BE405104
4 changed files with 30 additions and 29 deletions

View File

@ -238,7 +238,6 @@ public:
resched = do_sched ? 1 : 0; resched = do_sched ? 1 : 0;
}; };
// ------------------------------------------------------------------------- // -------------------------------------------------------------------------
// Log & Print // Log & Print
// ------------------------------------------------------------------------- // -------------------------------------------------------------------------

View File

@ -15,7 +15,9 @@
# limitations under the License. # # limitations under the License. #
#--------------------------------------------------------------------------- # #--------------------------------------------------------------------------- #
require 'base64'
require 'ffi-rzmq' require 'ffi-rzmq'
require 'nokogiri'
require 'EventManager' require 'EventManager'
# Service watchdog class # Service watchdog class
@ -98,7 +100,7 @@ class ServiceWD
def update(service_id, role_name, node) def update(service_id, role_name, node)
subscriber = gen_subscriber subscriber = gen_subscriber
unsubscribe(node, subscriber) unsubscribe(service_id, subscriber)
@nodes_mutex.synchronize do @nodes_mutex.synchronize do
return if @services_nodes[service_id].nil? return if @services_nodes[service_id].nil?
@ -136,13 +138,7 @@ class ServiceWD
# subscribe to all nodes # subscribe to all nodes
subscriber = gen_subscriber subscriber = gen_subscriber
@nodes_mutex.synchronize do subscribe(service_id, subscriber)
@services_nodes[service_id].each do |_, nodes|
nodes.each do |node|
subscribe(node, subscriber)
end
end
end
key = '' key = ''
content = '' content = ''
@ -156,18 +152,19 @@ class ServiceWD
Log.error LOG_COMP, 'Error reading from subscriber.' Log.error LOG_COMP, 'Error reading from subscriber.'
end end
# key format: EVENT VM VM_ID/STATE/LCM_STATE # key format: EVENT SERVICE SERVICE_ID
next if key.nil? next if key.nil?
split_key = key.split split_key = key.split
# if there is no data skip # if there is no data skip
next if split_key[2].nil? next if content.nil?
split_key = key.split[2].split('/') xml = Nokogiri::XML(Base64.decode64(content))
node = split_key[0].to_i
state = split_key[1] node = xml.xpath("/HOOK_MESSAGE/VM/ID").text.to_i
lcm_state = split_key[2] state = xml.xpath("/HOOK_MESSAGE/STATE").text
lcm_state = xml.xpath("/HOOK_MESSAGE/LCM_STATE").text
role_name = find_by_id(service_id, node) role_name = find_by_id(service_id, node)
# if the VM is not from the service skip # if the VM is not from the service skip
@ -188,13 +185,9 @@ class ServiceWD
# unsubscribe from all nodes # unsubscribe from all nodes
subscriber = gen_subscriber subscriber = gen_subscriber
@nodes_mutex.synchronize do unsubscribe(service_id, subscriber)
@services_nodes[service_id].each do |_, nodes|
nodes.each do |node|
unsubscribe(node, subscriber)
end
end
@nodes_mutex.synchronize do
@services_nodes.delete(service_id) @services_nodes.delete(service_id)
end end
end end
@ -221,18 +214,18 @@ class ServiceWD
# Subscribe to VM state changes # Subscribe to VM state changes
# #
# @param vm_id [Integer] VM ID to subscribe # @param service_id [Integer] Service ID to subscribe
# @param subscriber [ZMQ] ZMQ subscriber object # @param subscriber [ZMQ] ZMQ subscriber object
def subscribe(vm_id, subscriber) def subscribe(service_id, subscriber)
subscriber.setsockopt(ZMQ::SUBSCRIBE, "EVENT VM #{vm_id}") subscriber.setsockopt(ZMQ::SUBSCRIBE, "EVENT SERVICE #{service_id}")
end end
# Unsubscribe from VM state changes # Unsubscribe from VM state changes
# #
# @param vm_id [Integer] VM ID to unsubscribe # @param service_id [Integer] Service ID to subscribe
# @param subscriber [ZMQ] ZMQ subscriber object # @param subscriber [ZMQ] ZMQ subscriber object
def unsubscribe(vm_id, subscriber) def unsubscribe(service_id, subscriber)
subscriber.setsockopt(ZMQ::UNSUBSCRIBE, "EVENT VM #{vm_id}") subscriber.setsockopt(ZMQ::UNSUBSCRIBE, "EVENT SERVICE #{service_id}")
end end
# Check service roles state # Check service roles state

View File

@ -30,7 +30,7 @@ string * HookStateVM::format_message(VirtualMachine * vm)
{ {
std::ostringstream oss; std::ostringstream oss;
std::string vm_xml; std::string vm_xml;
std::string state, lcm_state; std::string state, lcm_state, service_id;
oss << "<HOOK_MESSAGE>" oss << "<HOOK_MESSAGE>"
<< "<HOOK_TYPE>STATE</HOOK_TYPE>" << "<HOOK_TYPE>STATE</HOOK_TYPE>"
@ -39,6 +39,13 @@ string * HookStateVM::format_message(VirtualMachine * vm)
<< "<LCM_STATE>" << VirtualMachine::lcm_state_to_str(lcm_state, vm->get_lcm_state()) << "</LCM_STATE>" << "<LCM_STATE>" << VirtualMachine::lcm_state_to_str(lcm_state, vm->get_lcm_state()) << "</LCM_STATE>"
<< "<RESOURCE_ID>" << vm->get_oid() << "</RESOURCE_ID>"; << "<RESOURCE_ID>" << vm->get_oid() << "</RESOURCE_ID>";
vm->get_user_template_attribute("SERVICE_ID", service_id);
if ( !service_id.empty() )
{
oss << "<SERVICE_ID>" << service_id << "</SERVICE_ID>";
}
if ( vm->hasHistory() ) if ( vm->hasHistory() )
{ {
oss << "<REMOTE_HOST>" << vm->get_hostname() << "</REMOTE_HOST>"; oss << "<REMOTE_HOST>" << vm->get_hostname() << "</REMOTE_HOST>";

View File

@ -146,9 +146,11 @@ class HookManagerDriver < OpenNebulaDriver
state = xml.xpath('//STATE')[0].text state = xml.xpath('//STATE')[0].text
lcm_state = xml.xpath('//LCM_STATE')[0].text if obj == 'VM' lcm_state = xml.xpath('//LCM_STATE')[0].text if obj == 'VM'
resource_id = xml.xpath('//RESOURCE_ID')[0].text resource_id = xml.xpath('//RESOURCE_ID')[0].text
service_id = xml.xpath('//SERVICE_ID')[0].text
["#{obj} #{resource_id}/#{state}/#{lcm_state} ", ["#{obj} #{resource_id}/#{state}/#{lcm_state} ",
"STATE #{obj}/#{state}/#{lcm_state}/#{resource_id} "] "STATE #{obj}/#{state}/#{lcm_state}/#{resource_id} ",
"SERVICE #{service_id} "]
else else
[''] ['']
end end