mirror of
https://github.com/OpenNebula/one.git
synced 2025-01-11 05:17:41 +03:00
F #1428: fix race condition on wait method
co-authored-by: Alejandro Huertas <ahuertas@opennebula.io>
This commit is contained in:
parent
0aea0c6c73
commit
6c898796f1
@ -118,13 +118,38 @@ module OpenNebula::WaitExt
|
||||
wait2(state_str, '', timeout, cycles)
|
||||
end
|
||||
|
||||
def wait_event(ctx, event, timeout)
|
||||
subscriber = ctx.socket(ZMQ::SUB)
|
||||
|
||||
# Create subscriber
|
||||
key = ''
|
||||
content = ''
|
||||
|
||||
subscriber.setsockopt(ZMQ::RCVTIMEO, timeout * 1000)
|
||||
subscriber.setsockopt(ZMQ::SUBSCRIBE, event)
|
||||
subscriber.connect('tcp://localhost:2101')
|
||||
|
||||
rc = subscriber.recv_string(key)
|
||||
rc = subscriber.recv_string(content) if rc != -1
|
||||
|
||||
return if ZMQ::Util.errno == ZMQ::EAGAIN || rc == -1
|
||||
|
||||
content
|
||||
ensure
|
||||
subscriber.setsockopt(ZMQ::UNSUBSCRIBE, event)
|
||||
subscriber.close
|
||||
end
|
||||
|
||||
def wait2(sstr1, sstr2, timeout = 60, cycles = -1)
|
||||
wfun = WAIT[self.class]
|
||||
|
||||
# Create subscriber
|
||||
subscriber = ZMQ::Context.new(1).socket(ZMQ::SUB)
|
||||
key = ''
|
||||
content = ''
|
||||
# Start with a timeout of 2 seconds, to wait until the first
|
||||
# info.
|
||||
#
|
||||
# The timeout is increased later, to avoid multiple info calls.
|
||||
c_timeout = 2
|
||||
recvs = 0
|
||||
in_state = false
|
||||
|
||||
# Subscribe with timeout seconds
|
||||
#
|
||||
@ -134,51 +159,32 @@ module OpenNebula::WaitExt
|
||||
#
|
||||
# - element_name: is the element name to find in the message
|
||||
# - self.ID: returns element ID to find in the message
|
||||
begin
|
||||
subscriber.setsockopt(ZMQ::RCVTIMEO, timeout * 1000)
|
||||
subscriber.setsockopt(
|
||||
ZMQ::SUBSCRIBE,
|
||||
wfun[:event].call(self, sstr1, sstr2)
|
||||
)
|
||||
ctx = ZMQ::Context.new(1)
|
||||
|
||||
subscriber.connect('tcp://localhost:2101')
|
||||
until in_state || (cycles != -1 && recvs >= cycles)
|
||||
content = wait_event(ctx,
|
||||
wfun[:event].call(self, sstr1, sstr2),
|
||||
c_timeout)
|
||||
|
||||
if content && !content.empty?
|
||||
in_state = wfun[:in_state_e].call(sstr1, sstr2, content)
|
||||
|
||||
break if in_state
|
||||
end
|
||||
|
||||
c_timeout *= 10
|
||||
c_timeout = timeout if c_timeout > timeout
|
||||
|
||||
rco = info
|
||||
|
||||
return false if OpenNebula.is_error?(rco)
|
||||
|
||||
in_state = wfun[:in_state].call(self, sstr1, sstr2)
|
||||
recvs = 0
|
||||
|
||||
until in_state || (cycles != -1 && recvs >= cycles)
|
||||
rc = subscriber.recv_string(key)
|
||||
rc = subscriber.recv_string(content) if rc != -1
|
||||
|
||||
if rc == -1
|
||||
return false if ZMQ::Util.errno != ZMQ::EAGAIN
|
||||
|
||||
rco = info
|
||||
|
||||
return false if OpenNebula.is_error?(rco)
|
||||
|
||||
in_state = wfun[:in_state].call(self, sstr1, sstr2)
|
||||
else
|
||||
in_state = wfun[:in_state_e].call(sstr1, sstr2,
|
||||
content)
|
||||
end
|
||||
|
||||
recvs += 1
|
||||
end
|
||||
|
||||
in_state
|
||||
ensure
|
||||
subscriber.setsockopt(
|
||||
ZMQ::UNSUBSCRIBE,
|
||||
wfun[:event].call(self, sstr1, sstr2)
|
||||
)
|
||||
|
||||
subscriber.close
|
||||
recvs += 1
|
||||
end
|
||||
|
||||
in_state
|
||||
end
|
||||
|
||||
end
|
||||
|
Loading…
Reference in New Issue
Block a user