diff --git a/src/oca/ruby/opennebula/image.rb b/src/oca/ruby/opennebula/image.rb index 5c367764ab..f5b9c5e1f9 100644 --- a/src/oca/ruby/opennebula/image.rb +++ b/src/oca/ruby/opennebula/image.rb @@ -308,6 +308,8 @@ module OpenNebula end def wait_state(state, timeout=120) + require 'opennebula/wait_ext' + extend OpenNebula::WaitExt rc = wait(state, timeout) diff --git a/src/oca/ruby/opennebula/virtual_machine.rb b/src/oca/ruby/opennebula/virtual_machine.rb index a8c59b3b4e..ce0fbf2a1a 100644 --- a/src/oca/ruby/opennebula/virtual_machine.rb +++ b/src/oca/ruby/opennebula/virtual_machine.rb @@ -771,6 +771,8 @@ module OpenNebula end def wait_state(state, timeout=120) + require 'opennebula/wait_ext' + extend OpenNebula::WaitExt rc = wait2(state, 'LCM_INIT', timeout) diff --git a/src/oca/ruby/opennebula/wait_ext.rb b/src/oca/ruby/opennebula/wait_ext.rb index 3c740474cd..b9fa6b210c 100644 --- a/src/oca/ruby/opennebula/wait_ext.rb +++ b/src/oca/ruby/opennebula/wait_ext.rb @@ -14,19 +14,117 @@ # limitations under the License. # #--------------------------------------------------------------------------- # -require 'ffi-rzmq' require 'opennebula/host' require 'opennebula/image' require 'opennebula/virtual_machine' +module OpenNebula::WaitExtEvent + def wait_event(ctx, event, timeout) + subscriber = ctx.socket(ZMQ::SUB) + + # Create subscriber + key = '' + content = '' + + subscriber.setsockopt(ZMQ::RCVTIMEO, timeout * 1000) + subscriber.setsockopt(ZMQ::SUBSCRIBE, event) + subscriber.connect(@client.one_zmq) + + rc = subscriber.recv_string(key) + rc = subscriber.recv_string(content) if rc != -1 + + return if ZMQ::Util.errno == ZMQ::EAGAIN || rc == -1 + + content + ensure + subscriber.setsockopt(ZMQ::UNSUBSCRIBE, event) + subscriber.close + end + + def wait2(sstr1, sstr2, timeout = 60, cycles = -1) + wfun = OpenNebula::WaitExt::WAIT[self.class] + + # Start with a timeout of 2 seconds, to wait until the first + # info. + # + # The timeout is increased later, to avoid multiple info calls. + c_timeout = 2 + recvs = 0 + in_state = false + + # Subscribe with timeout seconds + # + # Subscribe string: + # + # EVENT STATE element_name/state_str//self.ID + # + # - element_name: is the element name to find in the message + # - self.ID: returns element ID to find in the message + ctx = ZMQ::Context.new(1) + + until in_state || (cycles != -1 && recvs >= cycles) + content = wait_event(ctx, + wfun[:event].call(self, sstr1, sstr2), + c_timeout) + + if content && !content.empty? + in_state = wfun[:in_state_e].call(sstr1, sstr2, content) + + break if in_state + end + + c_timeout *= 10 + c_timeout = timeout if c_timeout > timeout + + rco = info + + return false if OpenNebula.is_error?(rco) + + in_state = wfun[:in_state].call(self, sstr1, sstr2) + + recvs += 1 + end + + in_state + end + +end + +module OpenNebula::WaitExtPolling + def wait2(sstr1, sstr2, timeout = 60, cycles = -1) + wfun = OpenNebula::WaitExt::WAIT[self.class] + + stime = 5 + recvs = 0 + cycles = timeout / stime + in_state = false + + loop do + rco = info + + return false if OpenNebula.is_error?(rco) + + in_state = wfun[:in_state].call(self, sstr1, sstr2) + + recvs += 1 + + break if in_state || recvs >= cycles + + sleep stime + end + + in_state + end + +end + # Module to decorate Wait classes with the following methods: # - Wait # # rubocop:disable Style/ClassAndModuleChildren module OpenNebula::WaitExt - - # Wait classes and the name published in ZMQ + # Wait classes and the name published in ZMQ/STATE WAIT = { OpenNebula::Host => { :event => lambda {|o, s1, _s2| @@ -78,7 +176,7 @@ module OpenNebula::WaitExt }, :in_state => lambda {|o, s1, s2| - obj_s1 = Integer(o['STATE']) + obj_s1 = Integer(o['STATE']) inx_s1 = OpenNebula::VirtualMachine::VM_STATE.index(s1) obj_s2 = Integer(o['LCM_STATE']) @@ -105,93 +203,30 @@ module OpenNebula::WaitExt wait?(obj) class << obj + begin + require 'ffi-rzmq' - # Wait until the element reaches some specific state - # It waits until the state can be found in ZMQ event message - # - # @param state_str [String] State name to wait - # @param timeout [Integer] Number of seconds to timeout event recv - # @param cycles [Integer] Number of recv cycles. After each one - # object status is checked in OpenNebula. - # Use -1 (default) to wait forever. - def wait(state_str, timeout = 60, cycles = -1) - wait2(state_str, '', timeout, cycles) - end - - def wait_event(ctx, event, timeout) - subscriber = ctx.socket(ZMQ::SUB) - - # Create subscriber - key = '' - content = '' - - subscriber.setsockopt(ZMQ::RCVTIMEO, timeout * 1000) - subscriber.setsockopt(ZMQ::SUBSCRIBE, event) - subscriber.connect(@client.one_zmq) - - rc = subscriber.recv_string(key) - rc = subscriber.recv_string(content) if rc != -1 - - return if ZMQ::Util.errno == ZMQ::EAGAIN || rc == -1 - - content - ensure - subscriber.setsockopt(ZMQ::UNSUBSCRIBE, event) - subscriber.close - end - - def wait2(sstr1, sstr2, timeout = 60, cycles = -1) - wfun = WAIT[self.class] - - # Start with a timeout of 2 seconds, to wait until the first - # info. - # - # The timeout is increased later, to avoid multiple info calls. - c_timeout = 2 - recvs = 0 - in_state = false - - # Subscribe with timeout seconds - # - # Subscribe string: - # - # EVENT STATE element_name/state_str//self.ID - # - # - element_name: is the element name to find in the message - # - self.ID: returns element ID to find in the message - ctx = ZMQ::Context.new(1) - - until in_state || (cycles != -1 && recvs >= cycles) - content = wait_event(ctx, - wfun[:event].call(self, sstr1, sstr2), - c_timeout) - - if content && !content.empty? - in_state = wfun[:in_state_e].call(sstr1, sstr2, content) - - break if in_state - end - - c_timeout *= 10 - c_timeout = timeout if c_timeout > timeout - - rco = info - - return false if OpenNebula.is_error?(rco) - - in_state = wfun[:in_state].call(self, sstr1, sstr2) - - recvs += 1 - end - - in_state - end - + include OpenNebula::WaitExtEvent + rescue LoadError + include OpenNebula::WaitExtPolling + end end super end + # Wait until the element reaches some specific state + # It waits until the state can be found in ZMQ event message + # + # @param state_str [String] State name to wait + # @param timeout [Integer] Number of seconds to timeout event recv + # @param cycles [Integer] Number of recv cycles. After each one + # object status is checked in OpenNebula. + # Use -1 (default) to wait forever. + def wait(state_str, timeout = 60, cycles = -1) + wait2(state_str, '', timeout, cycles) + end + # Check if object has the method wait or not # # @param obj [Object or Class] Object to check class