1
0
mirror of https://github.com/OpenNebula/one.git synced 2025-01-12 09:17:41 +03:00

F #5256: enable multithreading for one_hm (#812)

(cherry picked from commit 4e6b2d410c)
This commit is contained in:
Christian González 2021-02-24 09:34:44 +01:00 committed by Ruben S. Montero
parent c3d0e70f34
commit f41ed0cda2
No known key found for this signature in database
GPG Key ID: A0CEA6FA880A1D87

View File

@ -54,7 +54,7 @@ class HookManagerDriver < OpenNebulaDriver
# --------------------------------------------------------------------------
DEFAULT_CONF = {
:concurrency => 15,
:threaded => false,
:threaded => true,
:retries => 0,
:publisher_port => 2101,
:logger_port => 2102,
@ -64,7 +64,6 @@ class HookManagerDriver < OpenNebulaDriver
def initialize(options)
@options = DEFAULT_CONF.merge(options)
@options[:concurrency] = 1 # Only on thread using the publisher socket
super('', @options)
@ -81,6 +80,9 @@ class HookManagerDriver < OpenNebulaDriver
@replier = context.socket(ZMQ::REP)
@replier.bind("tcp://#{@options[:bind]}:#{@options[:logger_port]}")
# Lock to sync access to @publisher
@publisher_lock = Mutex.new
Thread.new do
receiver_thread
end
@ -94,10 +96,7 @@ class HookManagerDriver < OpenNebulaDriver
key(type, arg_xml).each do |key|
m_key = "EVENT #{key}"
# Using envelopes for splitting key/val
# http://zguide.zeromq.org/page:all#Pub-Sub-Message-Envelopes
send_string(@publisher, m_key, ZMQ::SNDMORE)
send_string(@publisher, arguments.flatten[0])
publish_message(@publisher, m_key, arguments.flatten[0])
end
rescue StandardError => e
log(0, "ERROR: #{e.message}")
@ -114,8 +113,7 @@ class HookManagerDriver < OpenNebulaDriver
m_key = 'RETRY'
m_val = "#{command} #{params}"
send_string(@publisher, m_key, ZMQ::SNDMORE)
send_string(@publisher, m_val)
publish_message(@publisher, m_key, m_val)
rescue StandardError => e
log(0, "ERROR: #{e.message}")
end
@ -173,13 +171,15 @@ class HookManagerDriver < OpenNebulaDriver
end
end
def send_string(socket, content, flag = nil)
def publish_message(socket, key, content)
rc = 0
if flag.nil?
rc = socket.send_string(content)
else
rc = socket.send_string(content, flag)
# Make sure only one thread publish on the socket at a time
@publisher_lock.synchronize do
# Using envelopes for splitting key/content
# http://zguide.zeromq.org/page:all#Pub-Sub-Message-Envelopes
rc = socket.send_string(key, ZMQ::SNDMORE)
rc = socket.send_string(content) unless rc < 0
end
return unless rc < 0