From a5778c8e28ba2c95ce8aff82be2cc3ac938dc970 Mon Sep 17 00:00:00 2001 From: Alejandro Huertas Date: Mon, 18 May 2020 17:18:00 +0200 Subject: [PATCH] M #-: subscribe to service changes --- include/VirtualMachine.h | 1 - src/flow/lib/ServiceWatchDog.rb | 45 ++++++++++++++------------------- src/hm/HookStateVM.cc | 9 ++++++- src/hm_mad/one_hm.rb | 4 ++- 4 files changed, 30 insertions(+), 29 deletions(-) diff --git a/include/VirtualMachine.h b/include/VirtualMachine.h index 85ba7b0528..a55b194083 100644 --- a/include/VirtualMachine.h +++ b/include/VirtualMachine.h @@ -238,7 +238,6 @@ public: resched = do_sched ? 1 : 0; }; - // ------------------------------------------------------------------------- // Log & Print // ------------------------------------------------------------------------- diff --git a/src/flow/lib/ServiceWatchDog.rb b/src/flow/lib/ServiceWatchDog.rb index 20f614c2de..7c2a342240 100644 --- a/src/flow/lib/ServiceWatchDog.rb +++ b/src/flow/lib/ServiceWatchDog.rb @@ -15,7 +15,9 @@ # limitations under the License. # #--------------------------------------------------------------------------- # +require 'base64' require 'ffi-rzmq' +require 'nokogiri' require 'EventManager' # Service watchdog class @@ -98,7 +100,7 @@ class ServiceWD def update(service_id, role_name, node) subscriber = gen_subscriber - unsubscribe(node, subscriber) + unsubscribe(service_id, subscriber) @nodes_mutex.synchronize do return if @services_nodes[service_id].nil? @@ -136,13 +138,7 @@ class ServiceWD # subscribe to all nodes subscriber = gen_subscriber - @nodes_mutex.synchronize do - @services_nodes[service_id].each do |_, nodes| - nodes.each do |node| - subscribe(node, subscriber) - end - end - end + subscribe(service_id, subscriber) key = '' content = '' @@ -156,18 +152,19 @@ class ServiceWD Log.error LOG_COMP, 'Error reading from subscriber.' end - # key format: EVENT VM VM_ID/STATE/LCM_STATE + # key format: EVENT SERVICE SERVICE_ID next if key.nil? split_key = key.split # if there is no data skip - next if split_key[2].nil? + next if content.nil? - split_key = key.split[2].split('/') - node = split_key[0].to_i - state = split_key[1] - lcm_state = split_key[2] + xml = Nokogiri::XML(Base64.decode64(content)) + + node = xml.xpath("/HOOK_MESSAGE/VM/ID").text.to_i + state = xml.xpath("/HOOK_MESSAGE/STATE").text + lcm_state = xml.xpath("/HOOK_MESSAGE/LCM_STATE").text role_name = find_by_id(service_id, node) # if the VM is not from the service skip @@ -188,13 +185,9 @@ class ServiceWD # unsubscribe from all nodes subscriber = gen_subscriber - @nodes_mutex.synchronize do - @services_nodes[service_id].each do |_, nodes| - nodes.each do |node| - unsubscribe(node, subscriber) - end - end + unsubscribe(service_id, subscriber) + @nodes_mutex.synchronize do @services_nodes.delete(service_id) end end @@ -221,18 +214,18 @@ class ServiceWD # Subscribe to VM state changes # - # @param vm_id [Integer] VM ID to subscribe + # @param service_id [Integer] Service ID to subscribe # @param subscriber [ZMQ] ZMQ subscriber object - def subscribe(vm_id, subscriber) - subscriber.setsockopt(ZMQ::SUBSCRIBE, "EVENT VM #{vm_id}") + def subscribe(service_id, subscriber) + subscriber.setsockopt(ZMQ::SUBSCRIBE, "EVENT SERVICE #{service_id}") end # Unsubscribe from VM state changes # - # @param vm_id [Integer] VM ID to unsubscribe + # @param service_id [Integer] Service ID to subscribe # @param subscriber [ZMQ] ZMQ subscriber object - def unsubscribe(vm_id, subscriber) - subscriber.setsockopt(ZMQ::UNSUBSCRIBE, "EVENT VM #{vm_id}") + def unsubscribe(service_id, subscriber) + subscriber.setsockopt(ZMQ::UNSUBSCRIBE, "EVENT SERVICE #{service_id}") end # Check service roles state diff --git a/src/hm/HookStateVM.cc b/src/hm/HookStateVM.cc index 9f2d1a88aa..7ebaa53c5a 100644 --- a/src/hm/HookStateVM.cc +++ b/src/hm/HookStateVM.cc @@ -30,7 +30,7 @@ string * HookStateVM::format_message(VirtualMachine * vm) { std::ostringstream oss; std::string vm_xml; - std::string state, lcm_state; + std::string state, lcm_state, service_id; oss << "" << "STATE" @@ -39,6 +39,13 @@ string * HookStateVM::format_message(VirtualMachine * vm) << "" << VirtualMachine::lcm_state_to_str(lcm_state, vm->get_lcm_state()) << "" << "" << vm->get_oid() << ""; + vm->get_user_template_attribute("SERVICE_ID", service_id); + + if ( !service_id.empty() ) + { + oss << "" << service_id << ""; + } + if ( vm->hasHistory() ) { oss << "" << vm->get_hostname() << ""; diff --git a/src/hm_mad/one_hm.rb b/src/hm_mad/one_hm.rb index 6cadb65952..b20a4103bd 100755 --- a/src/hm_mad/one_hm.rb +++ b/src/hm_mad/one_hm.rb @@ -146,9 +146,11 @@ class HookManagerDriver < OpenNebulaDriver state = xml.xpath('//STATE')[0].text lcm_state = xml.xpath('//LCM_STATE')[0].text if obj == 'VM' resource_id = xml.xpath('//RESOURCE_ID')[0].text + service_id = xml.xpath('//SERVICE_ID')[0].text ["#{obj} #{resource_id}/#{state}/#{lcm_state} ", - "STATE #{obj}/#{state}/#{lcm_state}/#{resource_id} "] + "STATE #{obj}/#{state}/#{lcm_state}/#{resource_id} ", + "SERVICE #{service_id} "] else [''] end