diff --git a/include/VirtualMachine.h b/include/VirtualMachine.h index 85ba7b0528..a55b194083 100644 --- a/include/VirtualMachine.h +++ b/include/VirtualMachine.h @@ -238,7 +238,6 @@ public: resched = do_sched ? 1 : 0; }; - // ------------------------------------------------------------------------- // Log & Print // ------------------------------------------------------------------------- diff --git a/src/flow/lib/LifeCycleManager.rb b/src/flow/lib/LifeCycleManager.rb index c817d928a3..ca69e2982e 100644 --- a/src/flow/lib/LifeCycleManager.rb +++ b/src/flow/lib/LifeCycleManager.rb @@ -58,7 +58,6 @@ class ServiceLCM } @event_manager = EventManager.new(em_conf).am - @wd = ServiceWD.new(client, em_conf) # Register Action Manager actions @am.register_action(ACTIONS['DEPLOY_CB'], @@ -90,6 +89,11 @@ class ServiceLCM Thread.new { catch_up(client) } + Thread.new do + wd = ServiceWD.new(client, em_conf) + wd.start(@srv_pool) + end + Thread.new do auto_scaler = ServiceAutoScaler.new(@srv_pool, client, @@ -239,9 +243,6 @@ class ServiceLCM if service.all_roles_running? service.set_state(Service::STATE['RUNNING']) service.update - - # start watchdog - @wd.start(service.id, service.roles) end # If there is no node in PENDING the service is not modified. @@ -287,9 +288,6 @@ class ServiceLCM ) end - # stop watchdog - @wd.stop(service.id) - set_deploy_strategy(service) roles = service.roles_shutdown @@ -343,9 +341,6 @@ class ServiceLCM ) end - # stop watchdog - @wd.stop(service.id) - role = service.roles[role_name] if role.nil? @@ -482,9 +477,6 @@ class ServiceLCM if service.all_roles_running? service.set_state(Service::STATE['RUNNING']) - - # start watching the service - @wd.start(service.id, service.roles) elsif service.strategy == 'straight' set_deploy_strategy(service) @@ -655,9 +647,6 @@ class ServiceLCM service.set_state(Service::STATE['RUNNING']) service.roles[role_name].set_state(Role::STATE['RUNNING']) - # start watching the service - @wd.start(service.id, service.roles) - service.update end @@ -694,8 +683,6 @@ class ServiceLCM Log.info 'WD', "Update #{service_id}:#{role_name} " \ "cardinality to #{cardinality}" - - @wd.update(service.id, role_name, node) end Log.error 'WD', rc.message if OpenNebula.is_error?(rc) @@ -728,13 +715,6 @@ class ServiceLCM @srv_pool.each do |service| recover_action(client, service.id) if service.transient_state? - - service.info - - if Service::STATE['RUNNING'] == service.state || - Service::STATE['WARNING'] == service.state - @wd.start(service.id, service.roles) - end end end diff --git a/src/flow/lib/ServiceAutoScaler.rb b/src/flow/lib/ServiceAutoScaler.rb index ada5e1cbb2..44ede7fc39 100644 --- a/src/flow/lib/ServiceAutoScaler.rb +++ b/src/flow/lib/ServiceAutoScaler.rb @@ -45,7 +45,7 @@ class ServiceAutoScaler # fill service roles information service.info_roles - next if service.state == Service::STATE['DONE'] + next if service.state != Service::STATE['RUNNING'] Log.info LOG_COMP, 'Checking policies for ' \ diff --git a/src/flow/lib/ServiceWatchDog.rb b/src/flow/lib/ServiceWatchDog.rb index 20f614c2de..b23551bf9f 100644 --- a/src/flow/lib/ServiceWatchDog.rb +++ b/src/flow/lib/ServiceWatchDog.rb @@ -15,7 +15,9 @@ # limitations under the License. # #--------------------------------------------------------------------------- # +require 'base64' require 'ffi-rzmq' +require 'nokogiri' require 'EventManager' # Service watchdog class @@ -51,98 +53,23 @@ class ServiceWD @cloud_auth = @conf[:cloud_auth] @wait_timeout = @cloud_auth.conf[:wait_timeout] @client = client - - @services_nodes = {} - @services_threads = {} - - @nodes_mutex = Mutex.new - @threads_mutex = Mutex.new end - # Start service WD thread + # Start services WD # - # @param service_id [Integer] Service ID to watch - # @param roles [Array] Service roles with its nodes - def start(service_id, roles) - Log.info LOG_COMP, "Start watching #{service_id}" + # @param service_pool [ServicePool] All services to check + def start(service_pool) + Log.info LOG_COMP, 'Start watch dog' - @threads_mutex.synchronize do - @services_threads[service_id] = Thread.new do - start_watching(service_id, roles) - end - end - end - - # Stop service WD thread - # - # @param service_id [Integer] Service ID to stop - def stop(service_id) - Log.info LOG_COMP, "Stop watching #{service_id}" - - @threads_mutex.synchronize do - return if @services_threads[service_id].nil? - - @services_threads[service_id].terminate - - @services_threads.delete(service_id) - end - - stop_watching(service_id) - end - - # Update service nodes - # - # @param service_id [Integer] Service ID to update - # @param role_name [String] Role to update - # @param node [Integer] VM ID to delete - def update(service_id, role_name, node) - subscriber = gen_subscriber - - unsubscribe(node, subscriber) - - @nodes_mutex.synchronize do - return if @services_nodes[service_id].nil? - - return if @services_nodes[service_id][role_name].nil? - - @services_nodes[service_id][role_name].delete(node) - - return unless @services_nodes[service_id][role_name].empty? - - # if all role nodes have been deleted, delete the rol - @services_nodes[service_id].delete(role_name) - end - end - - private - - # Start watching service roles nodes - # - # @param service_id [Integer] Service ID to watch - # @param roles [Array] Service roles with its nodes - def start_watching(service_id, roles) - @nodes_mutex.synchronize do - @services_nodes[service_id] = {} - - roles.each do |name, role| - @services_nodes[service_id][name] = {} - @services_nodes[service_id][name] = role.nodes_ids - end - end + service_pool.info_all # check that all nodes are in RUNNING state, if not, notify - check_roles_state(client, service_id, roles) + check_roles_state(client, service_pool) # subscribe to all nodes subscriber = gen_subscriber - @nodes_mutex.synchronize do - @services_nodes[service_id].each do |_, nodes| - nodes.each do |node| - subscribe(node, subscriber) - end - end - end + subscribe(subscriber) key = '' content = '' @@ -156,19 +83,21 @@ class ServiceWD Log.error LOG_COMP, 'Error reading from subscriber.' end - # key format: EVENT VM VM_ID/STATE/LCM_STATE + # key format: EVENT SERVICE SERVICE_ID next if key.nil? split_key = key.split # if there is no data skip - next if split_key[2].nil? + next if content.nil? - split_key = key.split[2].split('/') - node = split_key[0].to_i - state = split_key[1] - lcm_state = split_key[2] - role_name = find_by_id(service_id, node) + xml = Nokogiri::XML(Base64.decode64(content)) + + service_id = split_key[2] + node = xml.xpath('/HOOK_MESSAGE/VM/ID').text.to_i + state = xml.xpath('/HOOK_MESSAGE/STATE').text + lcm_state = xml.xpath('/HOOK_MESSAGE/LCM_STATE').text + role_name = xml.xpath('/HOOK_MESSAGE/VM//ROLE_NAME').text # if the VM is not from the service skip next if role_name.nil? @@ -181,24 +110,18 @@ class ServiceWD end end - # Stop watching service roles nodes - # - # @service_id [Integer] Service ID to stop watching - def stop_watching(service_id) + # Stop service WD thread + def stop + Log.info LOG_COMP, 'Stop watch dog' + # unsubscribe from all nodes subscriber = gen_subscriber - @nodes_mutex.synchronize do - @services_nodes[service_id].each do |_, nodes| - nodes.each do |node| - unsubscribe(node, subscriber) - end - end - - @services_nodes.delete(service_id) - end + unsubscribe(subscriber) end + private + # Get OpenNebula client def client # If there's a client defined use it @@ -221,29 +144,30 @@ class ServiceWD # Subscribe to VM state changes # - # @param vm_id [Integer] VM ID to subscribe - # @param subscriber [ZMQ] ZMQ subscriber object - def subscribe(vm_id, subscriber) - subscriber.setsockopt(ZMQ::SUBSCRIBE, "EVENT VM #{vm_id}") + # @param subscriber [ZMQ] ZMQ subscriber object + def subscribe(subscriber) + subscriber.setsockopt(ZMQ::SUBSCRIBE, 'EVENT SERVICE') end # Unsubscribe from VM state changes # - # @param vm_id [Integer] VM ID to unsubscribe - # @param subscriber [ZMQ] ZMQ subscriber object - def unsubscribe(vm_id, subscriber) - subscriber.setsockopt(ZMQ::UNSUBSCRIBE, "EVENT VM #{vm_id}") + # @param subscriber [ZMQ] ZMQ subscriber object + def unsubscribe(subscriber) + subscriber.setsockopt(ZMQ::UNSUBSCRIBE, 'EVENT SERVICE') end # Check service roles state # - # @param client [OpenNebula::Client] Client to make API calls - # @param service_id [Integer] Service ID to check - # @param roles [Array] Service roles with its nodes - def check_roles_state(client, service_id, roles) - roles.each do |name, role| - role.nodes_ids.each do |node| - check_role_state(client, service_id, name, node) + # @param client [OpenNebula::Client] Client to make API calls + # @param service_pool [ServicePool] All services to check + def check_roles_state(client, service_pool) + service_pool.each do |service| + service.info + + service.roles.each do |name, role| + role.nodes_ids.each do |node| + check_role_state(client, service.id, name, node) + end end end end @@ -252,24 +176,6 @@ class ServiceWD # HELPERS ############################################################################ - # Find role name for a given VM - # - # @param service_id [Integer] Service ID to get role from - # @param node [Integer] VM ID - # - # @return nil if don't find, role_name if found - def find_by_id(service_id, node) - ret = nil - - @nodes_mutex.synchronize do - ret = @services_nodes[service_id].find do |_, nodes| - nodes.include?(node) - end - end - - ret[0] unless ret.nil? - end - # Check role state # # @param client [OpenNebula::Client] Client to make API calls diff --git a/src/hm/HookStateVM.cc b/src/hm/HookStateVM.cc index 9f2d1a88aa..7ebaa53c5a 100644 --- a/src/hm/HookStateVM.cc +++ b/src/hm/HookStateVM.cc @@ -30,7 +30,7 @@ string * HookStateVM::format_message(VirtualMachine * vm) { std::ostringstream oss; std::string vm_xml; - std::string state, lcm_state; + std::string state, lcm_state, service_id; oss << "" << "STATE" @@ -39,6 +39,13 @@ string * HookStateVM::format_message(VirtualMachine * vm) << "" << VirtualMachine::lcm_state_to_str(lcm_state, vm->get_lcm_state()) << "" << "" << vm->get_oid() << ""; + vm->get_user_template_attribute("SERVICE_ID", service_id); + + if ( !service_id.empty() ) + { + oss << "" << service_id << ""; + } + if ( vm->hasHistory() ) { oss << "" << vm->get_hostname() << ""; diff --git a/src/hm_mad/one_hm.rb b/src/hm_mad/one_hm.rb index 6cadb65952..cf39195b70 100755 --- a/src/hm_mad/one_hm.rb +++ b/src/hm_mad/one_hm.rb @@ -146,9 +146,15 @@ class HookManagerDriver < OpenNebulaDriver state = xml.xpath('//STATE')[0].text lcm_state = xml.xpath('//LCM_STATE')[0].text if obj == 'VM' resource_id = xml.xpath('//RESOURCE_ID')[0].text + service_id = xml.xpath('//SERVICE_ID')[0] + service_id = service_id.text if service_id - ["#{obj} #{resource_id}/#{state}/#{lcm_state} ", - "STATE #{obj}/#{state}/#{lcm_state}/#{resource_id} "] + ret = ["#{obj} #{resource_id}/#{state}/#{lcm_state} ", + "STATE #{obj}/#{state}/#{lcm_state}/#{resource_id} "] + + ret << "SERVICE #{service_id} " if service_id + + ret else [''] end