mirror of
https://github.com/OpenNebula/one.git
synced 2025-03-21 14:50:08 +03:00
F #5284: Fallback in polling if no zeromq
This commit is contained in:
parent
15f41d53c4
commit
d1bd613fed
@ -308,6 +308,8 @@ module OpenNebula
|
||||
end
|
||||
|
||||
def wait_state(state, timeout=120)
|
||||
require 'opennebula/wait_ext'
|
||||
|
||||
extend OpenNebula::WaitExt
|
||||
|
||||
rc = wait(state, timeout)
|
||||
|
@ -771,6 +771,8 @@ module OpenNebula
|
||||
end
|
||||
|
||||
def wait_state(state, timeout=120)
|
||||
require 'opennebula/wait_ext'
|
||||
|
||||
extend OpenNebula::WaitExt
|
||||
|
||||
rc = wait2(state, 'LCM_INIT', timeout)
|
||||
|
@ -14,19 +14,117 @@
|
||||
# limitations under the License. #
|
||||
#--------------------------------------------------------------------------- #
|
||||
|
||||
require 'ffi-rzmq'
|
||||
|
||||
require 'opennebula/host'
|
||||
require 'opennebula/image'
|
||||
require 'opennebula/virtual_machine'
|
||||
|
||||
module OpenNebula::WaitExtEvent
|
||||
def wait_event(ctx, event, timeout)
|
||||
subscriber = ctx.socket(ZMQ::SUB)
|
||||
|
||||
# Create subscriber
|
||||
key = ''
|
||||
content = ''
|
||||
|
||||
subscriber.setsockopt(ZMQ::RCVTIMEO, timeout * 1000)
|
||||
subscriber.setsockopt(ZMQ::SUBSCRIBE, event)
|
||||
subscriber.connect(@client.one_zmq)
|
||||
|
||||
rc = subscriber.recv_string(key)
|
||||
rc = subscriber.recv_string(content) if rc != -1
|
||||
|
||||
return if ZMQ::Util.errno == ZMQ::EAGAIN || rc == -1
|
||||
|
||||
content
|
||||
ensure
|
||||
subscriber.setsockopt(ZMQ::UNSUBSCRIBE, event)
|
||||
subscriber.close
|
||||
end
|
||||
|
||||
def wait2(sstr1, sstr2, timeout = 60, cycles = -1)
|
||||
wfun = OpenNebula::WaitExt::WAIT[self.class]
|
||||
|
||||
# Start with a timeout of 2 seconds, to wait until the first
|
||||
# info.
|
||||
#
|
||||
# The timeout is increased later, to avoid multiple info calls.
|
||||
c_timeout = 2
|
||||
recvs = 0
|
||||
in_state = false
|
||||
|
||||
# Subscribe with timeout seconds
|
||||
#
|
||||
# Subscribe string:
|
||||
#
|
||||
# EVENT STATE element_name/state_str//self.ID
|
||||
#
|
||||
# - element_name: is the element name to find in the message
|
||||
# - self.ID: returns element ID to find in the message
|
||||
ctx = ZMQ::Context.new(1)
|
||||
|
||||
until in_state || (cycles != -1 && recvs >= cycles)
|
||||
content = wait_event(ctx,
|
||||
wfun[:event].call(self, sstr1, sstr2),
|
||||
c_timeout)
|
||||
|
||||
if content && !content.empty?
|
||||
in_state = wfun[:in_state_e].call(sstr1, sstr2, content)
|
||||
|
||||
break if in_state
|
||||
end
|
||||
|
||||
c_timeout *= 10
|
||||
c_timeout = timeout if c_timeout > timeout
|
||||
|
||||
rco = info
|
||||
|
||||
return false if OpenNebula.is_error?(rco)
|
||||
|
||||
in_state = wfun[:in_state].call(self, sstr1, sstr2)
|
||||
|
||||
recvs += 1
|
||||
end
|
||||
|
||||
in_state
|
||||
end
|
||||
|
||||
end
|
||||
|
||||
module OpenNebula::WaitExtPolling
|
||||
def wait2(sstr1, sstr2, timeout = 60, cycles = -1)
|
||||
wfun = OpenNebula::WaitExt::WAIT[self.class]
|
||||
|
||||
stime = 5
|
||||
recvs = 0
|
||||
cycles = timeout / stime
|
||||
in_state = false
|
||||
|
||||
loop do
|
||||
rco = info
|
||||
|
||||
return false if OpenNebula.is_error?(rco)
|
||||
|
||||
in_state = wfun[:in_state].call(self, sstr1, sstr2)
|
||||
|
||||
recvs += 1
|
||||
|
||||
break if in_state || recvs >= cycles
|
||||
|
||||
sleep stime
|
||||
end
|
||||
|
||||
in_state
|
||||
end
|
||||
|
||||
end
|
||||
|
||||
# Module to decorate Wait classes with the following methods:
|
||||
# - Wait
|
||||
#
|
||||
# rubocop:disable Style/ClassAndModuleChildren
|
||||
module OpenNebula::WaitExt
|
||||
|
||||
# Wait classes and the name published in ZMQ
|
||||
# Wait classes and the name published in ZMQ/STATE
|
||||
WAIT = {
|
||||
OpenNebula::Host => {
|
||||
:event => lambda {|o, s1, _s2|
|
||||
@ -78,7 +176,7 @@ module OpenNebula::WaitExt
|
||||
},
|
||||
|
||||
:in_state => lambda {|o, s1, s2|
|
||||
obj_s1 = Integer(o['STATE'])
|
||||
obj_s1 = Integer(o['STATE'])
|
||||
inx_s1 = OpenNebula::VirtualMachine::VM_STATE.index(s1)
|
||||
|
||||
obj_s2 = Integer(o['LCM_STATE'])
|
||||
@ -105,93 +203,30 @@ module OpenNebula::WaitExt
|
||||
wait?(obj)
|
||||
|
||||
class << obj
|
||||
begin
|
||||
require 'ffi-rzmq'
|
||||
|
||||
# Wait until the element reaches some specific state
|
||||
# It waits until the state can be found in ZMQ event message
|
||||
#
|
||||
# @param state_str [String] State name to wait
|
||||
# @param timeout [Integer] Number of seconds to timeout event recv
|
||||
# @param cycles [Integer] Number of recv cycles. After each one
|
||||
# object status is checked in OpenNebula.
|
||||
# Use -1 (default) to wait forever.
|
||||
def wait(state_str, timeout = 60, cycles = -1)
|
||||
wait2(state_str, '', timeout, cycles)
|
||||
end
|
||||
|
||||
def wait_event(ctx, event, timeout)
|
||||
subscriber = ctx.socket(ZMQ::SUB)
|
||||
|
||||
# Create subscriber
|
||||
key = ''
|
||||
content = ''
|
||||
|
||||
subscriber.setsockopt(ZMQ::RCVTIMEO, timeout * 1000)
|
||||
subscriber.setsockopt(ZMQ::SUBSCRIBE, event)
|
||||
subscriber.connect(@client.one_zmq)
|
||||
|
||||
rc = subscriber.recv_string(key)
|
||||
rc = subscriber.recv_string(content) if rc != -1
|
||||
|
||||
return if ZMQ::Util.errno == ZMQ::EAGAIN || rc == -1
|
||||
|
||||
content
|
||||
ensure
|
||||
subscriber.setsockopt(ZMQ::UNSUBSCRIBE, event)
|
||||
subscriber.close
|
||||
end
|
||||
|
||||
def wait2(sstr1, sstr2, timeout = 60, cycles = -1)
|
||||
wfun = WAIT[self.class]
|
||||
|
||||
# Start with a timeout of 2 seconds, to wait until the first
|
||||
# info.
|
||||
#
|
||||
# The timeout is increased later, to avoid multiple info calls.
|
||||
c_timeout = 2
|
||||
recvs = 0
|
||||
in_state = false
|
||||
|
||||
# Subscribe with timeout seconds
|
||||
#
|
||||
# Subscribe string:
|
||||
#
|
||||
# EVENT STATE element_name/state_str//self.ID
|
||||
#
|
||||
# - element_name: is the element name to find in the message
|
||||
# - self.ID: returns element ID to find in the message
|
||||
ctx = ZMQ::Context.new(1)
|
||||
|
||||
until in_state || (cycles != -1 && recvs >= cycles)
|
||||
content = wait_event(ctx,
|
||||
wfun[:event].call(self, sstr1, sstr2),
|
||||
c_timeout)
|
||||
|
||||
if content && !content.empty?
|
||||
in_state = wfun[:in_state_e].call(sstr1, sstr2, content)
|
||||
|
||||
break if in_state
|
||||
end
|
||||
|
||||
c_timeout *= 10
|
||||
c_timeout = timeout if c_timeout > timeout
|
||||
|
||||
rco = info
|
||||
|
||||
return false if OpenNebula.is_error?(rco)
|
||||
|
||||
in_state = wfun[:in_state].call(self, sstr1, sstr2)
|
||||
|
||||
recvs += 1
|
||||
end
|
||||
|
||||
in_state
|
||||
end
|
||||
|
||||
include OpenNebula::WaitExtEvent
|
||||
rescue LoadError
|
||||
include OpenNebula::WaitExtPolling
|
||||
end
|
||||
end
|
||||
|
||||
super
|
||||
end
|
||||
|
||||
# Wait until the element reaches some specific state
|
||||
# It waits until the state can be found in ZMQ event message
|
||||
#
|
||||
# @param state_str [String] State name to wait
|
||||
# @param timeout [Integer] Number of seconds to timeout event recv
|
||||
# @param cycles [Integer] Number of recv cycles. After each one
|
||||
# object status is checked in OpenNebula.
|
||||
# Use -1 (default) to wait forever.
|
||||
def wait(state_str, timeout = 60, cycles = -1)
|
||||
wait2(state_str, '', timeout, cycles)
|
||||
end
|
||||
|
||||
# Check if object has the method wait or not
|
||||
#
|
||||
# @param obj [Object or Class] Object to check class
|
||||
|
Loading…
x
Reference in New Issue
Block a user