diff --git a/src/flow/lib/EventManager.rb b/src/flow/lib/EventManager.rb index 925104940b..1d905605d9 100644 --- a/src/flow/lib/EventManager.rb +++ b/src/flow/lib/EventManager.rb @@ -51,6 +51,13 @@ class EventManager PROLOG_UNDEPLOY_FAILURE ] + SUBSCRIBE_STATES = %w[ + STOPPED + SUSPENDED + POWEROFF + UNDEPLOYED + ] + # -------------------------------------------------------------------------- # Default configuration options for the module # -------------------------------------------------------------------------- @@ -108,7 +115,8 @@ class EventManager service_id, client, service_id, - role_name) + role_name, + rc[1]) else @lcm.trigger_action(:deploy_failure_cb, service_id, @@ -163,7 +171,8 @@ class EventManager service_id, client, service_id, - role_name) + role_name, + rc[1]) else @lcm.trigger_action(:scaleup_failure_cb, service_id, @@ -225,11 +234,17 @@ class EventManager def wait(nodes, state, lcm_state) subscriber = gen_subscriber - rc_nodes = { :successful => [], :failure => [] } + rc_nodes = { :successful => {}, :failure => {} } rc = check_nodes(nodes, state, lcm_state, subscriber) - rc_nodes[:successful].concat(rc[:successful]) - rc_nodes[:failure].concat(rc[:failure]) + # rc_nodes[:successful] has the following structure + # + # node_id => boolean + # + # = true means the VM was deleted by external user + # = false means the VM state is in SUBSCRIBE_STATES + rc_nodes[:successful].merge!(rc[:successful]) + rc_nodes[:failure].merge!(rc[:failure]) if nodes.empty? && rc_nodes[:failure].empty? subscriber.close @@ -239,6 +254,10 @@ class EventManager nodes.each do |node| subscribe(node, state, lcm_state, subscriber) + + (SUBSCRIBE_STATES + ['DONE']).each do |s| + subscribe(node, s, 'LCM_INIT', subscriber) + end end key = '' @@ -249,20 +268,24 @@ class EventManager rc = subscriber.recv_string(content) if rc != -1 if rc == -1 && ZMQ::Util.errno != ZMQ::EAGAIN - Log.error LOG_COMP, 'Error reading from subscriber.' + next Log.error LOG_COMP, 'Error reading from subscriber.' elsif rc == -1 Log.info LOG_COMP, "Timeout reached for VM #{nodes} =>"\ " (#{state}, #{lcm_state})" rc = check_nodes(nodes, state, lcm_state, subscriber) - rc_nodes[:successful].concat(rc[:successful]) - rc_nodes[:failure].concat(rc[:failure]) + rc_nodes[:successful].merge!(rc[:successful]) + rc_nodes[:failure].merge!(rc[:failure]) next if !nodes.empty? && rc_nodes[:failure].empty? nodes.each do |id| unsubscribe(id, state, lcm_state, subscriber) + + (SUBSCRIBE_STATES + ['DONE']).each do |s| + unsubscribe(id, s, 'LCM_INIT', subscriber) + end end # If any node is in error wait action will fails @@ -272,11 +295,18 @@ class EventManager end id = retrieve_id(key) - Log.info LOG_COMP, "Node #{id} reached (#{state},#{lcm_state})" + rc = check_nodes([id], state, lcm_state, subscriber) + + rc_nodes[:successful].merge!(rc[:successful]) + rc_nodes[:failure].merge!(rc[:failure]) nodes.delete(id) + unsubscribe(id, state, lcm_state, subscriber) - rc_nodes[:successful] << id + + (SUBSCRIBE_STATES + ['DONE']).each do |s| + unsubscribe(id, s, 'LCM_INIT', subscriber) + end end subscriber.close @@ -285,11 +315,17 @@ class EventManager end def wait_report_ready(nodes) - rc_nodes = { :successful => [], :failure => [] } + rc_nodes = { :successful => {}, :failure => {} } rc = check_nodes_report(nodes) - rc_nodes[:successful].concat(rc[:successful]) - rc_nodes[:failure].concat(rc[:failure]) + # rc_nodes[:successful] has the following structure + # + # node_id => boolean + # + # = true means the VM was deleted by external user + # = false means the VM state is in SUBSCRIBE_STATES + rc_nodes[:successful].merge!(rc[:successful]) + rc_nodes[:failure].merge!(rc[:failure]) return [true, rc_nodes] if nodes.empty? && rc_nodes[:failure].empty? @@ -297,6 +333,12 @@ class EventManager subscriber.setsockopt(ZMQ::SUBSCRIBE, 'EVENT API one.vm.update 1') + nodes.each do |node| + (SUBSCRIBE_STATES + ['DONE']).each do |s| + subscribe(node, s, 'LCM_INIT', subscriber) + end + end + key = '' content = '' @@ -312,14 +354,20 @@ class EventManager rc = check_nodes_report(nodes) - rc_nodes[:successful].concat(rc[:successful]) - rc_nodes[:failure].concat(rc[:failure]) + rc_nodes[:successful].merge!(rc[:successful]) + rc_nodes[:failure].merge!(rc[:failure]) next if !nodes.empty? && rc_nodes[:failure].empty? subscriber.setsockopt(ZMQ::UNSUBSCRIBE, 'EVENT API one.vm.update 1') + nodes.each do |node| + (SUBSCRIBE_STATES + ['DONE']).each do |s| + unsubscribe(node, s, 'LCM_INIT', subscriber) + end + end + # If any node is in error wait action will fails return [false, rc_nodes] unless rc_nodes[:failure].empty? @@ -328,6 +376,22 @@ class EventManager # rubocop:enable Style/GuardClause + id = retrieve_id(key) + rc = check_nodes_report([id]) + + rc_nodes[:successful].merge!(rc[:successful]) + rc_nodes[:failure].merge!(rc[:failure]) + + unless rc[:successful].empty? + nodes.delete(id) + + (SUBSCRIBE_STATES + ['DONE']).each do |s| + unsubscribe(id, s, 'LCM_INIT', subscriber) + end + + next + end + xml = Nokogiri::XML(Base64.decode64(content)) id = xml.xpath( @@ -348,8 +412,12 @@ class EventManager Log.info LOG_COMP, "Node #{id} reported ready" + (SUBSCRIBE_STATES + ['DONE']).each do |s| + unsubscribe(id, s, 'LCM_INIT', subscriber) + end + nodes.delete(id) - rc_nodes[:successful] << id + rc_nodes[:successful][id] = false end subscriber.setsockopt(ZMQ::UNSUBSCRIBE, 'EVENT API one.vm.update 1') @@ -360,7 +428,7 @@ class EventManager end def check_nodes(nodes, state, lcm_state, subscriber) - rc_nodes = { :successful => [], :failure => [] } + rc_nodes = { :successful => {}, :failure => {} } nodes.delete_if do |node| vm = OpenNebula::VirtualMachine @@ -371,18 +439,24 @@ class EventManager vm_state = OpenNebula::VirtualMachine::VM_STATE[vm.state] vm_lcm_state = OpenNebula::VirtualMachine::LCM_STATE[vm.lcm_state] + Log.info LOG_COMP, + "Node #{node} reached (#{vm_state},#{vm_lcm_state})" + if vm_state == 'DONE' || (vm_state == state && vm_lcm_state == lcm_state) unsubscribe(node, state, lcm_state, subscriber) - rc_nodes[:successful] << node + rc_nodes[:successful][node] = true + next true + elsif SUBSCRIBE_STATES.include?(vm_state) + rc_nodes[:successful][node] = false next true end if FAILURE_STATES.include? vm_lcm_state Log.error LOG_COMP, "Node #{node} is in FAILURE state" - rc_nodes[:failure] << node + rc_nodes[:failure][node] = false next true end @@ -394,7 +468,7 @@ class EventManager end def check_nodes_report(nodes) - rc_nodes = { :successful => [], :failure => [] } + rc_nodes = { :successful => {}, :failure => {} } nodes.delete_if do |node| vm = OpenNebula::VirtualMachine.new_with_id(node, @@ -402,11 +476,20 @@ class EventManager vm.info + vm_state = OpenNebula::VirtualMachine::VM_STATE[vm.state] vm_lcm_state = OpenNebula::VirtualMachine::LCM_STATE[vm.lcm_state] + if vm_state == 'DONE' + rc_nodes[:successful][node] = true + next true + elsif SUBSCRIBE_STATES.include?(vm_state) + rc_nodes[:successful][node] = false + next true + end + if vm['/VM/USER_TEMPLATE/READY'] && vm['/VM/USER_TEMPLATE/READY'].strip == 'YES' - rc_nodes[:successful] << node + rc_nodes[:successful][node] = false next true end @@ -415,7 +498,7 @@ class EventManager if FAILURE_STATES.include? vm_lcm_state Log.error LOG_COMP, "Node #{node} is in FAILURE state" - rc_nodes[:failure] << node + rc_nodes[:failure][node] = false next true end diff --git a/src/flow/lib/LifeCycleManager.rb b/src/flow/lib/LifeCycleManager.rb index c0531291b3..e2c6011a59 100644 --- a/src/flow/lib/LifeCycleManager.rb +++ b/src/flow/lib/LifeCycleManager.rb @@ -478,10 +478,18 @@ class ServiceLCM # Callbacks ############################################################################ - def deploy_cb(client, service_id, role_name) + def deploy_cb(client, service_id, role_name, nodes) rc = @srv_pool.get(service_id, client) do |service| service.roles[role_name].set_state(Role::STATE['RUNNING']) + service.roles[role_name].nodes.delete_if do |node| + if nodes[node] && service.roles[role_name].cardinalitty > 0 + service.roles[role_name].cardinality -= 1 + end + + nodes[node] + end + if service.all_roles_running? service.set_state(Service::STATE['RUNNING']) elsif service.strategy == 'straight' @@ -573,8 +581,16 @@ class ServiceLCM Log.error LOG_COMP, rc.message if OpenNebula.is_error?(rc) end - def scaleup_cb(client, service_id, role_name) + def scaleup_cb(client, service_id, role_name, nodes) rc = @srv_pool.get(service_id, client) do |service| + service.roles[role_name].nodes.delete_if do |node| + if nodes[node] && service.roles[role_name].cardinalitty > 0 + service.roles[role_name].cardinality -= 1 + end + + nodes[node] + end + service.set_state(Service::STATE['COOLDOWN']) service.roles[role_name].set_state(Role::STATE['COOLDOWN'])