From 4e6b2d410c8a1f947223d358b45b706a7a2b0b25 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Christian=20Gonz=C3=A1lez?= Date: Wed, 24 Feb 2021 09:34:44 +0100 Subject: [PATCH] F #5256: enable multithreading for one_hm (#812) --- src/hm_mad/one_hm.rb | 26 +++++++++++++------------- 1 file changed, 13 insertions(+), 13 deletions(-) diff --git a/src/hm_mad/one_hm.rb b/src/hm_mad/one_hm.rb index ea4c03df66..99bff4845c 100755 --- a/src/hm_mad/one_hm.rb +++ b/src/hm_mad/one_hm.rb @@ -51,7 +51,7 @@ class HookManagerDriver < OpenNebulaDriver # -------------------------------------------------------------------------- DEFAULT_CONF = { :concurrency => 15, - :threaded => false, + :threaded => true, :retries => 0, :publisher_port => 2101, :logger_port => 2102, @@ -61,7 +61,6 @@ class HookManagerDriver < OpenNebulaDriver def initialize(options) @options = DEFAULT_CONF.merge(options) - @options[:concurrency] = 1 # Only on thread using the publisher socket super('', @options) @@ -78,6 +77,9 @@ class HookManagerDriver < OpenNebulaDriver @replier = context.socket(ZMQ::REP) @replier.bind("tcp://#{@options[:bind]}:#{@options[:logger_port]}") + # Lock to sync access to @publisher + @publisher_lock = Mutex.new + Thread.new do receiver_thread end @@ -91,10 +93,7 @@ class HookManagerDriver < OpenNebulaDriver key(type, arg_xml).each do |key| m_key = "EVENT #{key}" - # Using envelopes for splitting key/val - # http://zguide.zeromq.org/page:all#Pub-Sub-Message-Envelopes - send_string(@publisher, m_key, ZMQ::SNDMORE) - send_string(@publisher, arguments.flatten[0]) + publish_message(@publisher, m_key, arguments.flatten[0]) end rescue StandardError => e log(0, "ERROR: #{e.message}") @@ -111,8 +110,7 @@ class HookManagerDriver < OpenNebulaDriver m_key = 'RETRY' m_val = "#{command} #{params}" - send_string(@publisher, m_key, ZMQ::SNDMORE) - send_string(@publisher, m_val) + publish_message(@publisher, m_key, m_val) rescue StandardError => e log(0, "ERROR: #{e.message}") end @@ -170,13 +168,15 @@ class HookManagerDriver < OpenNebulaDriver end end - def send_string(socket, content, flag = nil) + def publish_message(socket, key, content) rc = 0 - if flag.nil? - rc = socket.send_string(content) - else - rc = socket.send_string(content, flag) + # Make sure only one thread publish on the socket at a time + @publisher_lock.synchronize do + # Using envelopes for splitting key/content + # http://zguide.zeromq.org/page:all#Pub-Sub-Message-Envelopes + rc = socket.send_string(key, ZMQ::SNDMORE) + rc = socket.send_string(content) unless rc < 0 end return unless rc < 0