mirror of
https://github.com/OpenNebula/one.git
synced 2025-03-21 14:50:08 +03:00
parent
5f9bb93d51
commit
4e6b2d410c
@ -51,7 +51,7 @@ class HookManagerDriver < OpenNebulaDriver
|
||||
# --------------------------------------------------------------------------
|
||||
DEFAULT_CONF = {
|
||||
:concurrency => 15,
|
||||
:threaded => false,
|
||||
:threaded => true,
|
||||
:retries => 0,
|
||||
:publisher_port => 2101,
|
||||
:logger_port => 2102,
|
||||
@ -61,7 +61,6 @@ class HookManagerDriver < OpenNebulaDriver
|
||||
|
||||
def initialize(options)
|
||||
@options = DEFAULT_CONF.merge(options)
|
||||
@options[:concurrency] = 1 # Only on thread using the publisher socket
|
||||
|
||||
super('', @options)
|
||||
|
||||
@ -78,6 +77,9 @@ class HookManagerDriver < OpenNebulaDriver
|
||||
@replier = context.socket(ZMQ::REP)
|
||||
@replier.bind("tcp://#{@options[:bind]}:#{@options[:logger_port]}")
|
||||
|
||||
# Lock to sync access to @publisher
|
||||
@publisher_lock = Mutex.new
|
||||
|
||||
Thread.new do
|
||||
receiver_thread
|
||||
end
|
||||
@ -91,10 +93,7 @@ class HookManagerDriver < OpenNebulaDriver
|
||||
key(type, arg_xml).each do |key|
|
||||
m_key = "EVENT #{key}"
|
||||
|
||||
# Using envelopes for splitting key/val
|
||||
# http://zguide.zeromq.org/page:all#Pub-Sub-Message-Envelopes
|
||||
send_string(@publisher, m_key, ZMQ::SNDMORE)
|
||||
send_string(@publisher, arguments.flatten[0])
|
||||
publish_message(@publisher, m_key, arguments.flatten[0])
|
||||
end
|
||||
rescue StandardError => e
|
||||
log(0, "ERROR: #{e.message}")
|
||||
@ -111,8 +110,7 @@ class HookManagerDriver < OpenNebulaDriver
|
||||
m_key = 'RETRY'
|
||||
m_val = "#{command} #{params}"
|
||||
|
||||
send_string(@publisher, m_key, ZMQ::SNDMORE)
|
||||
send_string(@publisher, m_val)
|
||||
publish_message(@publisher, m_key, m_val)
|
||||
rescue StandardError => e
|
||||
log(0, "ERROR: #{e.message}")
|
||||
end
|
||||
@ -170,13 +168,15 @@ class HookManagerDriver < OpenNebulaDriver
|
||||
end
|
||||
end
|
||||
|
||||
def send_string(socket, content, flag = nil)
|
||||
def publish_message(socket, key, content)
|
||||
rc = 0
|
||||
|
||||
if flag.nil?
|
||||
rc = socket.send_string(content)
|
||||
else
|
||||
rc = socket.send_string(content, flag)
|
||||
# Make sure only one thread publish on the socket at a time
|
||||
@publisher_lock.synchronize do
|
||||
# Using envelopes for splitting key/content
|
||||
# http://zguide.zeromq.org/page:all#Pub-Sub-Message-Envelopes
|
||||
rc = socket.send_string(key, ZMQ::SNDMORE)
|
||||
rc = socket.send_string(content) unless rc < 0
|
||||
end
|
||||
|
||||
return unless rc < 0
|
||||
|
Loading…
x
Reference in New Issue
Block a user