From 6c898796f1c4d385571812a469e52d4c19a8e360 Mon Sep 17 00:00:00 2001 From: "Ruben S. Montero" Date: Thu, 18 Feb 2021 18:06:25 +0100 Subject: [PATCH] F #1428: fix race condition on wait method co-authored-by: Alejandro Huertas --- src/oca/ruby/opennebula/wait_ext.rb | 86 +++++++++++++++-------------- 1 file changed, 46 insertions(+), 40 deletions(-) diff --git a/src/oca/ruby/opennebula/wait_ext.rb b/src/oca/ruby/opennebula/wait_ext.rb index 5df9f1f395..e2e8dac746 100644 --- a/src/oca/ruby/opennebula/wait_ext.rb +++ b/src/oca/ruby/opennebula/wait_ext.rb @@ -118,13 +118,38 @@ module OpenNebula::WaitExt wait2(state_str, '', timeout, cycles) end + def wait_event(ctx, event, timeout) + subscriber = ctx.socket(ZMQ::SUB) + + # Create subscriber + key = '' + content = '' + + subscriber.setsockopt(ZMQ::RCVTIMEO, timeout * 1000) + subscriber.setsockopt(ZMQ::SUBSCRIBE, event) + subscriber.connect('tcp://localhost:2101') + + rc = subscriber.recv_string(key) + rc = subscriber.recv_string(content) if rc != -1 + + return if ZMQ::Util.errno == ZMQ::EAGAIN || rc == -1 + + content + ensure + subscriber.setsockopt(ZMQ::UNSUBSCRIBE, event) + subscriber.close + end + def wait2(sstr1, sstr2, timeout = 60, cycles = -1) wfun = WAIT[self.class] - # Create subscriber - subscriber = ZMQ::Context.new(1).socket(ZMQ::SUB) - key = '' - content = '' + # Start with a timeout of 2 seconds, to wait until the first + # info. + # + # The timeout is increased later, to avoid multiple info calls. + c_timeout = 2 + recvs = 0 + in_state = false # Subscribe with timeout seconds # @@ -134,51 +159,32 @@ module OpenNebula::WaitExt # # - element_name: is the element name to find in the message # - self.ID: returns element ID to find in the message - begin - subscriber.setsockopt(ZMQ::RCVTIMEO, timeout * 1000) - subscriber.setsockopt( - ZMQ::SUBSCRIBE, - wfun[:event].call(self, sstr1, sstr2) - ) + ctx = ZMQ::Context.new(1) - subscriber.connect('tcp://localhost:2101') + until in_state || (cycles != -1 && recvs >= cycles) + content = wait_event(ctx, + wfun[:event].call(self, sstr1, sstr2), + c_timeout) + + if content && !content.empty? + in_state = wfun[:in_state_e].call(sstr1, sstr2, content) + + break if in_state + end + + c_timeout *= 10 + c_timeout = timeout if c_timeout > timeout rco = info return false if OpenNebula.is_error?(rco) in_state = wfun[:in_state].call(self, sstr1, sstr2) - recvs = 0 - until in_state || (cycles != -1 && recvs >= cycles) - rc = subscriber.recv_string(key) - rc = subscriber.recv_string(content) if rc != -1 - - if rc == -1 - return false if ZMQ::Util.errno != ZMQ::EAGAIN - - rco = info - - return false if OpenNebula.is_error?(rco) - - in_state = wfun[:in_state].call(self, sstr1, sstr2) - else - in_state = wfun[:in_state_e].call(sstr1, sstr2, - content) - end - - recvs += 1 - end - - in_state - ensure - subscriber.setsockopt( - ZMQ::UNSUBSCRIBE, - wfun[:event].call(self, sstr1, sstr2) - ) - - subscriber.close + recvs += 1 end + + in_state end end