From a5778c8e28ba2c95ce8aff82be2cc3ac938dc970 Mon Sep 17 00:00:00 2001 From: Alejandro Huertas Date: Mon, 18 May 2020 17:18:00 +0200 Subject: [PATCH 1/7] M #-: subscribe to service changes --- include/VirtualMachine.h | 1 - src/flow/lib/ServiceWatchDog.rb | 45 ++++++++++++++------------------- src/hm/HookStateVM.cc | 9 ++++++- src/hm_mad/one_hm.rb | 4 ++- 4 files changed, 30 insertions(+), 29 deletions(-) diff --git a/include/VirtualMachine.h b/include/VirtualMachine.h index 85ba7b0528..a55b194083 100644 --- a/include/VirtualMachine.h +++ b/include/VirtualMachine.h @@ -238,7 +238,6 @@ public: resched = do_sched ? 1 : 0; }; - // ------------------------------------------------------------------------- // Log & Print // ------------------------------------------------------------------------- diff --git a/src/flow/lib/ServiceWatchDog.rb b/src/flow/lib/ServiceWatchDog.rb index 20f614c2de..7c2a342240 100644 --- a/src/flow/lib/ServiceWatchDog.rb +++ b/src/flow/lib/ServiceWatchDog.rb @@ -15,7 +15,9 @@ # limitations under the License. # #--------------------------------------------------------------------------- # +require 'base64' require 'ffi-rzmq' +require 'nokogiri' require 'EventManager' # Service watchdog class @@ -98,7 +100,7 @@ class ServiceWD def update(service_id, role_name, node) subscriber = gen_subscriber - unsubscribe(node, subscriber) + unsubscribe(service_id, subscriber) @nodes_mutex.synchronize do return if @services_nodes[service_id].nil? @@ -136,13 +138,7 @@ class ServiceWD # subscribe to all nodes subscriber = gen_subscriber - @nodes_mutex.synchronize do - @services_nodes[service_id].each do |_, nodes| - nodes.each do |node| - subscribe(node, subscriber) - end - end - end + subscribe(service_id, subscriber) key = '' content = '' @@ -156,18 +152,19 @@ class ServiceWD Log.error LOG_COMP, 'Error reading from subscriber.' end - # key format: EVENT VM VM_ID/STATE/LCM_STATE + # key format: EVENT SERVICE SERVICE_ID next if key.nil? split_key = key.split # if there is no data skip - next if split_key[2].nil? + next if content.nil? - split_key = key.split[2].split('/') - node = split_key[0].to_i - state = split_key[1] - lcm_state = split_key[2] + xml = Nokogiri::XML(Base64.decode64(content)) + + node = xml.xpath("/HOOK_MESSAGE/VM/ID").text.to_i + state = xml.xpath("/HOOK_MESSAGE/STATE").text + lcm_state = xml.xpath("/HOOK_MESSAGE/LCM_STATE").text role_name = find_by_id(service_id, node) # if the VM is not from the service skip @@ -188,13 +185,9 @@ class ServiceWD # unsubscribe from all nodes subscriber = gen_subscriber - @nodes_mutex.synchronize do - @services_nodes[service_id].each do |_, nodes| - nodes.each do |node| - unsubscribe(node, subscriber) - end - end + unsubscribe(service_id, subscriber) + @nodes_mutex.synchronize do @services_nodes.delete(service_id) end end @@ -221,18 +214,18 @@ class ServiceWD # Subscribe to VM state changes # - # @param vm_id [Integer] VM ID to subscribe + # @param service_id [Integer] Service ID to subscribe # @param subscriber [ZMQ] ZMQ subscriber object - def subscribe(vm_id, subscriber) - subscriber.setsockopt(ZMQ::SUBSCRIBE, "EVENT VM #{vm_id}") + def subscribe(service_id, subscriber) + subscriber.setsockopt(ZMQ::SUBSCRIBE, "EVENT SERVICE #{service_id}") end # Unsubscribe from VM state changes # - # @param vm_id [Integer] VM ID to unsubscribe + # @param service_id [Integer] Service ID to subscribe # @param subscriber [ZMQ] ZMQ subscriber object - def unsubscribe(vm_id, subscriber) - subscriber.setsockopt(ZMQ::UNSUBSCRIBE, "EVENT VM #{vm_id}") + def unsubscribe(service_id, subscriber) + subscriber.setsockopt(ZMQ::UNSUBSCRIBE, "EVENT SERVICE #{service_id}") end # Check service roles state diff --git a/src/hm/HookStateVM.cc b/src/hm/HookStateVM.cc index 9f2d1a88aa..7ebaa53c5a 100644 --- a/src/hm/HookStateVM.cc +++ b/src/hm/HookStateVM.cc @@ -30,7 +30,7 @@ string * HookStateVM::format_message(VirtualMachine * vm) { std::ostringstream oss; std::string vm_xml; - std::string state, lcm_state; + std::string state, lcm_state, service_id; oss << "" << "STATE" @@ -39,6 +39,13 @@ string * HookStateVM::format_message(VirtualMachine * vm) << "" << VirtualMachine::lcm_state_to_str(lcm_state, vm->get_lcm_state()) << "" << "" << vm->get_oid() << ""; + vm->get_user_template_attribute("SERVICE_ID", service_id); + + if ( !service_id.empty() ) + { + oss << "" << service_id << ""; + } + if ( vm->hasHistory() ) { oss << "" << vm->get_hostname() << ""; diff --git a/src/hm_mad/one_hm.rb b/src/hm_mad/one_hm.rb index 6cadb65952..b20a4103bd 100755 --- a/src/hm_mad/one_hm.rb +++ b/src/hm_mad/one_hm.rb @@ -146,9 +146,11 @@ class HookManagerDriver < OpenNebulaDriver state = xml.xpath('//STATE')[0].text lcm_state = xml.xpath('//LCM_STATE')[0].text if obj == 'VM' resource_id = xml.xpath('//RESOURCE_ID')[0].text + service_id = xml.xpath('//SERVICE_ID')[0].text ["#{obj} #{resource_id}/#{state}/#{lcm_state} ", - "STATE #{obj}/#{state}/#{lcm_state}/#{resource_id} "] + "STATE #{obj}/#{state}/#{lcm_state}/#{resource_id} ", + "SERVICE #{service_id} "] else [''] end From adc815235e665e0587ec5d4c0c9af5d1ee49afb2 Mon Sep 17 00:00:00 2001 From: Alejandro Huertas Date: Mon, 18 May 2020 17:24:48 +0200 Subject: [PATCH 2/7] M #-: subscribe to service changes --- src/hm_mad/one_hm.rb | 10 ++++++---- 1 file changed, 6 insertions(+), 4 deletions(-) diff --git a/src/hm_mad/one_hm.rb b/src/hm_mad/one_hm.rb index b20a4103bd..545b972385 100755 --- a/src/hm_mad/one_hm.rb +++ b/src/hm_mad/one_hm.rb @@ -146,11 +146,13 @@ class HookManagerDriver < OpenNebulaDriver state = xml.xpath('//STATE')[0].text lcm_state = xml.xpath('//LCM_STATE')[0].text if obj == 'VM' resource_id = xml.xpath('//RESOURCE_ID')[0].text - service_id = xml.xpath('//SERVICE_ID')[0].text + service_id = xml.xpath('//SERVICE_ID')[0] + service_id = service_id.text if service - ["#{obj} #{resource_id}/#{state}/#{lcm_state} ", - "STATE #{obj}/#{state}/#{lcm_state}/#{resource_id} ", - "SERVICE #{service_id} "] + ret = ["#{obj} #{resource_id}/#{state}/#{lcm_state} ", + "STATE #{obj}/#{state}/#{lcm_state}/#{resource_id} "] + + ret << "SERVICE #{service_id} " if service_id else [''] end From ed4b9ff98511a76300e089eca3e413299e35dd49 Mon Sep 17 00:00:00 2001 From: Alejandro Huertas Date: Mon, 18 May 2020 17:25:25 +0200 Subject: [PATCH 3/7] M #-: subscribe to service changes --- src/hm_mad/one_hm.rb | 2 ++ 1 file changed, 2 insertions(+) diff --git a/src/hm_mad/one_hm.rb b/src/hm_mad/one_hm.rb index 545b972385..c20ba1a0b8 100755 --- a/src/hm_mad/one_hm.rb +++ b/src/hm_mad/one_hm.rb @@ -153,6 +153,8 @@ class HookManagerDriver < OpenNebulaDriver "STATE #{obj}/#{state}/#{lcm_state}/#{resource_id} "] ret << "SERVICE #{service_id} " if service_id + + ret else [''] end From 13f6ea86848e840decfe73bd098ff1bffec9eaba Mon Sep 17 00:00:00 2001 From: Alejandro Huertas Date: Mon, 18 May 2020 18:21:16 +0200 Subject: [PATCH 4/7] M #-: subscribe to service changes --- src/flow/lib/LifeCycleManager.rb | 30 +----- src/flow/lib/ServiceAutoScaler.rb | 2 +- src/flow/lib/ServiceWatchDog.rb | 147 ++++++------------------------ src/hm_mad/one_hm.rb | 2 +- 4 files changed, 36 insertions(+), 145 deletions(-) diff --git a/src/flow/lib/LifeCycleManager.rb b/src/flow/lib/LifeCycleManager.rb index c817d928a3..f2f36243c2 100644 --- a/src/flow/lib/LifeCycleManager.rb +++ b/src/flow/lib/LifeCycleManager.rb @@ -58,7 +58,6 @@ class ServiceLCM } @event_manager = EventManager.new(em_conf).am - @wd = ServiceWD.new(client, em_conf) # Register Action Manager actions @am.register_action(ACTIONS['DEPLOY_CB'], @@ -90,6 +89,11 @@ class ServiceLCM Thread.new { catch_up(client) } + Thread.new do + wd = ServiceWD.new(client, em_conf) + wd.start @srv_pool + end + Thread.new do auto_scaler = ServiceAutoScaler.new(@srv_pool, client, @@ -239,9 +243,6 @@ class ServiceLCM if service.all_roles_running? service.set_state(Service::STATE['RUNNING']) service.update - - # start watchdog - @wd.start(service.id, service.roles) end # If there is no node in PENDING the service is not modified. @@ -287,9 +288,6 @@ class ServiceLCM ) end - # stop watchdog - @wd.stop(service.id) - set_deploy_strategy(service) roles = service.roles_shutdown @@ -343,9 +341,6 @@ class ServiceLCM ) end - # stop watchdog - @wd.stop(service.id) - role = service.roles[role_name] if role.nil? @@ -482,9 +477,6 @@ class ServiceLCM if service.all_roles_running? service.set_state(Service::STATE['RUNNING']) - - # start watching the service - @wd.start(service.id, service.roles) elsif service.strategy == 'straight' set_deploy_strategy(service) @@ -655,9 +647,6 @@ class ServiceLCM service.set_state(Service::STATE['RUNNING']) service.roles[role_name].set_state(Role::STATE['RUNNING']) - # start watching the service - @wd.start(service.id, service.roles) - service.update end @@ -694,8 +683,6 @@ class ServiceLCM Log.info 'WD', "Update #{service_id}:#{role_name} " \ "cardinality to #{cardinality}" - - @wd.update(service.id, role_name, node) end Log.error 'WD', rc.message if OpenNebula.is_error?(rc) @@ -728,13 +715,6 @@ class ServiceLCM @srv_pool.each do |service| recover_action(client, service.id) if service.transient_state? - - service.info - - if Service::STATE['RUNNING'] == service.state || - Service::STATE['WARNING'] == service.state - @wd.start(service.id, service.roles) - end end end diff --git a/src/flow/lib/ServiceAutoScaler.rb b/src/flow/lib/ServiceAutoScaler.rb index ada5e1cbb2..44ede7fc39 100644 --- a/src/flow/lib/ServiceAutoScaler.rb +++ b/src/flow/lib/ServiceAutoScaler.rb @@ -45,7 +45,7 @@ class ServiceAutoScaler # fill service roles information service.info_roles - next if service.state == Service::STATE['DONE'] + next if service.state != Service::STATE['RUNNING'] Log.info LOG_COMP, 'Checking policies for ' \ diff --git a/src/flow/lib/ServiceWatchDog.rb b/src/flow/lib/ServiceWatchDog.rb index 7c2a342240..036dd76730 100644 --- a/src/flow/lib/ServiceWatchDog.rb +++ b/src/flow/lib/ServiceWatchDog.rb @@ -53,92 +53,22 @@ class ServiceWD @cloud_auth = @conf[:cloud_auth] @wait_timeout = @cloud_auth.conf[:wait_timeout] @client = client - - @services_nodes = {} - @services_threads = {} - - @nodes_mutex = Mutex.new - @threads_mutex = Mutex.new end - # Start service WD thread + # Start services WD # # @param service_id [Integer] Service ID to watch # @param roles [Array] Service roles with its nodes - def start(service_id, roles) - Log.info LOG_COMP, "Start watching #{service_id}" - - @threads_mutex.synchronize do - @services_threads[service_id] = Thread.new do - start_watching(service_id, roles) - end - end - end - - # Stop service WD thread - # - # @param service_id [Integer] Service ID to stop - def stop(service_id) - Log.info LOG_COMP, "Stop watching #{service_id}" - - @threads_mutex.synchronize do - return if @services_threads[service_id].nil? - - @services_threads[service_id].terminate - - @services_threads.delete(service_id) - end - - stop_watching(service_id) - end - - # Update service nodes - # - # @param service_id [Integer] Service ID to update - # @param role_name [String] Role to update - # @param node [Integer] VM ID to delete - def update(service_id, role_name, node) - subscriber = gen_subscriber - - unsubscribe(service_id, subscriber) - - @nodes_mutex.synchronize do - return if @services_nodes[service_id].nil? - - return if @services_nodes[service_id][role_name].nil? - - @services_nodes[service_id][role_name].delete(node) - - return unless @services_nodes[service_id][role_name].empty? - - # if all role nodes have been deleted, delete the rol - @services_nodes[service_id].delete(role_name) - end - end - - private - - # Start watching service roles nodes - # - # @param service_id [Integer] Service ID to watch - # @param roles [Array] Service roles with its nodes - def start_watching(service_id, roles) - @nodes_mutex.synchronize do - @services_nodes[service_id] = {} - - roles.each do |name, role| - @services_nodes[service_id][name] = {} - @services_nodes[service_id][name] = role.nodes_ids - end - end + def start(service_pool) + Log.info LOG_COMP, 'Start watch dog' # check that all nodes are in RUNNING state, if not, notify - check_roles_state(client, service_id, roles) + check_roles_state(client, service_pool) # subscribe to all nodes subscriber = gen_subscriber - subscribe(service_id, subscriber) + subscribe(subscriber) key = '' content = '' @@ -162,10 +92,11 @@ class ServiceWD xml = Nokogiri::XML(Base64.decode64(content)) - node = xml.xpath("/HOOK_MESSAGE/VM/ID").text.to_i - state = xml.xpath("/HOOK_MESSAGE/STATE").text - lcm_state = xml.xpath("/HOOK_MESSAGE/LCM_STATE").text - role_name = find_by_id(service_id, node) + service_id = split_key[2] + node = xml.xpath('/HOOK_MESSAGE/VM/ID').text.to_i + state = xml.xpath('/HOOK_MESSAGE/STATE').text + lcm_state = xml.xpath('/HOOK_MESSAGE/LCM_STATE').text + role_name = xml.xpath('/HOOK_MESSAGE/VM//ROLE_NAME').text # if the VM is not from the service skip next if role_name.nil? @@ -178,20 +109,18 @@ class ServiceWD end end - # Stop watching service roles nodes - # - # @service_id [Integer] Service ID to stop watching - def stop_watching(service_id) + # Stop service WD thread + def stop + Log.info LOG_COMP, 'Stop watch dog' + # unsubscribe from all nodes subscriber = gen_subscriber - unsubscribe(service_id, subscriber) - - @nodes_mutex.synchronize do - @services_nodes.delete(service_id) - end + unsubscribe(subscriber) end + private + # Get OpenNebula client def client # If there's a client defined use it @@ -214,18 +143,16 @@ class ServiceWD # Subscribe to VM state changes # - # @param service_id [Integer] Service ID to subscribe - # @param subscriber [ZMQ] ZMQ subscriber object - def subscribe(service_id, subscriber) - subscriber.setsockopt(ZMQ::SUBSCRIBE, "EVENT SERVICE #{service_id}") + # @param subscriber [ZMQ] ZMQ subscriber object + def subscribe(subscriber) + subscriber.setsockopt(ZMQ::SUBSCRIBE, 'EVENT SERVICE') end # Unsubscribe from VM state changes # - # @param service_id [Integer] Service ID to subscribe - # @param subscriber [ZMQ] ZMQ subscriber object - def unsubscribe(service_id, subscriber) - subscriber.setsockopt(ZMQ::UNSUBSCRIBE, "EVENT SERVICE #{service_id}") + # @param subscriber [ZMQ] ZMQ subscriber object + def unsubscribe(subscriber) + subscriber.setsockopt(ZMQ::UNSUBSCRIBE, 'EVENT SERVICE') end # Check service roles state @@ -233,10 +160,12 @@ class ServiceWD # @param client [OpenNebula::Client] Client to make API calls # @param service_id [Integer] Service ID to check # @param roles [Array] Service roles with its nodes - def check_roles_state(client, service_id, roles) - roles.each do |name, role| - role.nodes_ids.each do |node| - check_role_state(client, service_id, name, node) + def check_roles_state(client, service_pool) + service_pool.each do |service| + service.roles.each do |name, role| + role.nodes_ids.each do |node| + check_role_state(client, service_id, name, node) + end end end end @@ -245,24 +174,6 @@ class ServiceWD # HELPERS ############################################################################ - # Find role name for a given VM - # - # @param service_id [Integer] Service ID to get role from - # @param node [Integer] VM ID - # - # @return nil if don't find, role_name if found - def find_by_id(service_id, node) - ret = nil - - @nodes_mutex.synchronize do - ret = @services_nodes[service_id].find do |_, nodes| - nodes.include?(node) - end - end - - ret[0] unless ret.nil? - end - # Check role state # # @param client [OpenNebula::Client] Client to make API calls diff --git a/src/hm_mad/one_hm.rb b/src/hm_mad/one_hm.rb index c20ba1a0b8..cf39195b70 100755 --- a/src/hm_mad/one_hm.rb +++ b/src/hm_mad/one_hm.rb @@ -147,7 +147,7 @@ class HookManagerDriver < OpenNebulaDriver lcm_state = xml.xpath('//LCM_STATE')[0].text if obj == 'VM' resource_id = xml.xpath('//RESOURCE_ID')[0].text service_id = xml.xpath('//SERVICE_ID')[0] - service_id = service_id.text if service + service_id = service_id.text if service_id ret = ["#{obj} #{resource_id}/#{state}/#{lcm_state} ", "STATE #{obj}/#{state}/#{lcm_state}/#{resource_id} "] From 181cdce470b2b71c01fe5496da3589515f679861 Mon Sep 17 00:00:00 2001 From: Alejandro Huertas Date: Mon, 18 May 2020 18:28:07 +0200 Subject: [PATCH 5/7] M #-: subscribe to service changes --- src/flow/lib/LifeCycleManager.rb | 2 +- src/flow/lib/ServiceWatchDog.rb | 8 ++++++-- 2 files changed, 7 insertions(+), 3 deletions(-) diff --git a/src/flow/lib/LifeCycleManager.rb b/src/flow/lib/LifeCycleManager.rb index f2f36243c2..ca69e2982e 100644 --- a/src/flow/lib/LifeCycleManager.rb +++ b/src/flow/lib/LifeCycleManager.rb @@ -91,7 +91,7 @@ class ServiceLCM Thread.new do wd = ServiceWD.new(client, em_conf) - wd.start @srv_pool + wd.start(@srv_pool) end Thread.new do diff --git a/src/flow/lib/ServiceWatchDog.rb b/src/flow/lib/ServiceWatchDog.rb index 036dd76730..0ccfa13e03 100644 --- a/src/flow/lib/ServiceWatchDog.rb +++ b/src/flow/lib/ServiceWatchDog.rb @@ -62,6 +62,8 @@ class ServiceWD def start(service_pool) Log.info LOG_COMP, 'Start watch dog' + service_pool.info_all + # check that all nodes are in RUNNING state, if not, notify check_roles_state(client, service_pool) @@ -161,10 +163,12 @@ class ServiceWD # @param service_id [Integer] Service ID to check # @param roles [Array] Service roles with its nodes def check_roles_state(client, service_pool) - service_pool.each do |service| + service_pool.each do |service| + service.info + service.roles.each do |name, role| role.nodes_ids.each do |node| - check_role_state(client, service_id, name, node) + check_role_state(client, service.id, name, node) end end end From ccf048cfb8cc8623887b51e43b85cfefba24a324 Mon Sep 17 00:00:00 2001 From: Alejandro Huertas Date: Mon, 18 May 2020 18:33:42 +0200 Subject: [PATCH 6/7] M #-: subscribe to service changes --- src/flow/lib/ServiceWatchDog.rb | 8 +++----- 1 file changed, 3 insertions(+), 5 deletions(-) diff --git a/src/flow/lib/ServiceWatchDog.rb b/src/flow/lib/ServiceWatchDog.rb index 0ccfa13e03..14fbd6bf7c 100644 --- a/src/flow/lib/ServiceWatchDog.rb +++ b/src/flow/lib/ServiceWatchDog.rb @@ -57,8 +57,7 @@ class ServiceWD # Start services WD # - # @param service_id [Integer] Service ID to watch - # @param roles [Array] Service roles with its nodes + # @param service_pool [ServicePool] All services to check def start(service_pool) Log.info LOG_COMP, 'Start watch dog' @@ -159,9 +158,8 @@ class ServiceWD # Check service roles state # - # @param client [OpenNebula::Client] Client to make API calls - # @param service_id [Integer] Service ID to check - # @param roles [Array] Service roles with its nodes + # @param client [OpenNebula::Client] Client to make API calls + # @param service_pool [ServicePool] All services to check def check_roles_state(client, service_pool) service_pool.each do |service| service.info From 9ea589abd0f1ed55977a470ffd9897adc601264c Mon Sep 17 00:00:00 2001 From: Alejandro Huertas Date: Mon, 18 May 2020 18:42:28 +0200 Subject: [PATCH 7/7] M #-: subscribe to service changes --- src/flow/lib/ServiceWatchDog.rb | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/src/flow/lib/ServiceWatchDog.rb b/src/flow/lib/ServiceWatchDog.rb index 14fbd6bf7c..b23551bf9f 100644 --- a/src/flow/lib/ServiceWatchDog.rb +++ b/src/flow/lib/ServiceWatchDog.rb @@ -161,7 +161,7 @@ class ServiceWD # @param client [OpenNebula::Client] Client to make API calls # @param service_pool [ServicePool] All services to check def check_roles_state(client, service_pool) - service_pool.each do |service| + service_pool.each do |service| service.info service.roles.each do |name, role|