1
0
mirror of https://github.com/OpenNebula/one.git synced 2025-03-16 22:50:10 +03:00

M #-: subscribe to service changes (#4753)

This commit is contained in:
Ruben S. Montero 2020-05-18 18:43:21 +02:00 committed by GitHub
commit 392f0c00a8
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
6 changed files with 63 additions and 165 deletions

View File

@ -238,7 +238,6 @@ public:
resched = do_sched ? 1 : 0;
};
// -------------------------------------------------------------------------
// Log & Print
// -------------------------------------------------------------------------

View File

@ -58,7 +58,6 @@ class ServiceLCM
}
@event_manager = EventManager.new(em_conf).am
@wd = ServiceWD.new(client, em_conf)
# Register Action Manager actions
@am.register_action(ACTIONS['DEPLOY_CB'],
@ -90,6 +89,11 @@ class ServiceLCM
Thread.new { catch_up(client) }
Thread.new do
wd = ServiceWD.new(client, em_conf)
wd.start(@srv_pool)
end
Thread.new do
auto_scaler = ServiceAutoScaler.new(@srv_pool,
client,
@ -239,9 +243,6 @@ class ServiceLCM
if service.all_roles_running?
service.set_state(Service::STATE['RUNNING'])
service.update
# start watchdog
@wd.start(service.id, service.roles)
end
# If there is no node in PENDING the service is not modified.
@ -287,9 +288,6 @@ class ServiceLCM
)
end
# stop watchdog
@wd.stop(service.id)
set_deploy_strategy(service)
roles = service.roles_shutdown
@ -343,9 +341,6 @@ class ServiceLCM
)
end
# stop watchdog
@wd.stop(service.id)
role = service.roles[role_name]
if role.nil?
@ -482,9 +477,6 @@ class ServiceLCM
if service.all_roles_running?
service.set_state(Service::STATE['RUNNING'])
# start watching the service
@wd.start(service.id, service.roles)
elsif service.strategy == 'straight'
set_deploy_strategy(service)
@ -655,9 +647,6 @@ class ServiceLCM
service.set_state(Service::STATE['RUNNING'])
service.roles[role_name].set_state(Role::STATE['RUNNING'])
# start watching the service
@wd.start(service.id, service.roles)
service.update
end
@ -694,8 +683,6 @@ class ServiceLCM
Log.info 'WD',
"Update #{service_id}:#{role_name} " \
"cardinality to #{cardinality}"
@wd.update(service.id, role_name, node)
end
Log.error 'WD', rc.message if OpenNebula.is_error?(rc)
@ -728,13 +715,6 @@ class ServiceLCM
@srv_pool.each do |service|
recover_action(client, service.id) if service.transient_state?
service.info
if Service::STATE['RUNNING'] == service.state ||
Service::STATE['WARNING'] == service.state
@wd.start(service.id, service.roles)
end
end
end

View File

@ -45,7 +45,7 @@ class ServiceAutoScaler
# fill service roles information
service.info_roles
next if service.state == Service::STATE['DONE']
next if service.state != Service::STATE['RUNNING']
Log.info LOG_COMP,
'Checking policies for ' \

View File

@ -15,7 +15,9 @@
# limitations under the License. #
#--------------------------------------------------------------------------- #
require 'base64'
require 'ffi-rzmq'
require 'nokogiri'
require 'EventManager'
# Service watchdog class
@ -51,98 +53,23 @@ class ServiceWD
@cloud_auth = @conf[:cloud_auth]
@wait_timeout = @cloud_auth.conf[:wait_timeout]
@client = client
@services_nodes = {}
@services_threads = {}
@nodes_mutex = Mutex.new
@threads_mutex = Mutex.new
end
# Start service WD thread
# Start services WD
#
# @param service_id [Integer] Service ID to watch
# @param roles [Array] Service roles with its nodes
def start(service_id, roles)
Log.info LOG_COMP, "Start watching #{service_id}"
# @param service_pool [ServicePool] All services to check
def start(service_pool)
Log.info LOG_COMP, 'Start watch dog'
@threads_mutex.synchronize do
@services_threads[service_id] = Thread.new do
start_watching(service_id, roles)
end
end
end
# Stop service WD thread
#
# @param service_id [Integer] Service ID to stop
def stop(service_id)
Log.info LOG_COMP, "Stop watching #{service_id}"
@threads_mutex.synchronize do
return if @services_threads[service_id].nil?
@services_threads[service_id].terminate
@services_threads.delete(service_id)
end
stop_watching(service_id)
end
# Update service nodes
#
# @param service_id [Integer] Service ID to update
# @param role_name [String] Role to update
# @param node [Integer] VM ID to delete
def update(service_id, role_name, node)
subscriber = gen_subscriber
unsubscribe(node, subscriber)
@nodes_mutex.synchronize do
return if @services_nodes[service_id].nil?
return if @services_nodes[service_id][role_name].nil?
@services_nodes[service_id][role_name].delete(node)
return unless @services_nodes[service_id][role_name].empty?
# if all role nodes have been deleted, delete the rol
@services_nodes[service_id].delete(role_name)
end
end
private
# Start watching service roles nodes
#
# @param service_id [Integer] Service ID to watch
# @param roles [Array] Service roles with its nodes
def start_watching(service_id, roles)
@nodes_mutex.synchronize do
@services_nodes[service_id] = {}
roles.each do |name, role|
@services_nodes[service_id][name] = {}
@services_nodes[service_id][name] = role.nodes_ids
end
end
service_pool.info_all
# check that all nodes are in RUNNING state, if not, notify
check_roles_state(client, service_id, roles)
check_roles_state(client, service_pool)
# subscribe to all nodes
subscriber = gen_subscriber
@nodes_mutex.synchronize do
@services_nodes[service_id].each do |_, nodes|
nodes.each do |node|
subscribe(node, subscriber)
end
end
end
subscribe(subscriber)
key = ''
content = ''
@ -156,19 +83,21 @@ class ServiceWD
Log.error LOG_COMP, 'Error reading from subscriber.'
end
# key format: EVENT VM VM_ID/STATE/LCM_STATE
# key format: EVENT SERVICE SERVICE_ID
next if key.nil?
split_key = key.split
# if there is no data skip
next if split_key[2].nil?
next if content.nil?
split_key = key.split[2].split('/')
node = split_key[0].to_i
state = split_key[1]
lcm_state = split_key[2]
role_name = find_by_id(service_id, node)
xml = Nokogiri::XML(Base64.decode64(content))
service_id = split_key[2]
node = xml.xpath('/HOOK_MESSAGE/VM/ID').text.to_i
state = xml.xpath('/HOOK_MESSAGE/STATE').text
lcm_state = xml.xpath('/HOOK_MESSAGE/LCM_STATE').text
role_name = xml.xpath('/HOOK_MESSAGE/VM//ROLE_NAME').text
# if the VM is not from the service skip
next if role_name.nil?
@ -181,24 +110,18 @@ class ServiceWD
end
end
# Stop watching service roles nodes
#
# @service_id [Integer] Service ID to stop watching
def stop_watching(service_id)
# Stop service WD thread
def stop
Log.info LOG_COMP, 'Stop watch dog'
# unsubscribe from all nodes
subscriber = gen_subscriber
@nodes_mutex.synchronize do
@services_nodes[service_id].each do |_, nodes|
nodes.each do |node|
unsubscribe(node, subscriber)
end
end
@services_nodes.delete(service_id)
end
unsubscribe(subscriber)
end
private
# Get OpenNebula client
def client
# If there's a client defined use it
@ -221,29 +144,30 @@ class ServiceWD
# Subscribe to VM state changes
#
# @param vm_id [Integer] VM ID to subscribe
# @param subscriber [ZMQ] ZMQ subscriber object
def subscribe(vm_id, subscriber)
subscriber.setsockopt(ZMQ::SUBSCRIBE, "EVENT VM #{vm_id}")
# @param subscriber [ZMQ] ZMQ subscriber object
def subscribe(subscriber)
subscriber.setsockopt(ZMQ::SUBSCRIBE, 'EVENT SERVICE')
end
# Unsubscribe from VM state changes
#
# @param vm_id [Integer] VM ID to unsubscribe
# @param subscriber [ZMQ] ZMQ subscriber object
def unsubscribe(vm_id, subscriber)
subscriber.setsockopt(ZMQ::UNSUBSCRIBE, "EVENT VM #{vm_id}")
# @param subscriber [ZMQ] ZMQ subscriber object
def unsubscribe(subscriber)
subscriber.setsockopt(ZMQ::UNSUBSCRIBE, 'EVENT SERVICE')
end
# Check service roles state
#
# @param client [OpenNebula::Client] Client to make API calls
# @param service_id [Integer] Service ID to check
# @param roles [Array] Service roles with its nodes
def check_roles_state(client, service_id, roles)
roles.each do |name, role|
role.nodes_ids.each do |node|
check_role_state(client, service_id, name, node)
# @param client [OpenNebula::Client] Client to make API calls
# @param service_pool [ServicePool] All services to check
def check_roles_state(client, service_pool)
service_pool.each do |service|
service.info
service.roles.each do |name, role|
role.nodes_ids.each do |node|
check_role_state(client, service.id, name, node)
end
end
end
end
@ -252,24 +176,6 @@ class ServiceWD
# HELPERS
############################################################################
# Find role name for a given VM
#
# @param service_id [Integer] Service ID to get role from
# @param node [Integer] VM ID
#
# @return nil if don't find, role_name if found
def find_by_id(service_id, node)
ret = nil
@nodes_mutex.synchronize do
ret = @services_nodes[service_id].find do |_, nodes|
nodes.include?(node)
end
end
ret[0] unless ret.nil?
end
# Check role state
#
# @param client [OpenNebula::Client] Client to make API calls

View File

@ -30,7 +30,7 @@ string * HookStateVM::format_message(VirtualMachine * vm)
{
std::ostringstream oss;
std::string vm_xml;
std::string state, lcm_state;
std::string state, lcm_state, service_id;
oss << "<HOOK_MESSAGE>"
<< "<HOOK_TYPE>STATE</HOOK_TYPE>"
@ -39,6 +39,13 @@ string * HookStateVM::format_message(VirtualMachine * vm)
<< "<LCM_STATE>" << VirtualMachine::lcm_state_to_str(lcm_state, vm->get_lcm_state()) << "</LCM_STATE>"
<< "<RESOURCE_ID>" << vm->get_oid() << "</RESOURCE_ID>";
vm->get_user_template_attribute("SERVICE_ID", service_id);
if ( !service_id.empty() )
{
oss << "<SERVICE_ID>" << service_id << "</SERVICE_ID>";
}
if ( vm->hasHistory() )
{
oss << "<REMOTE_HOST>" << vm->get_hostname() << "</REMOTE_HOST>";

View File

@ -146,9 +146,15 @@ class HookManagerDriver < OpenNebulaDriver
state = xml.xpath('//STATE')[0].text
lcm_state = xml.xpath('//LCM_STATE')[0].text if obj == 'VM'
resource_id = xml.xpath('//RESOURCE_ID')[0].text
service_id = xml.xpath('//SERVICE_ID')[0]
service_id = service_id.text if service_id
["#{obj} #{resource_id}/#{state}/#{lcm_state} ",
"STATE #{obj}/#{state}/#{lcm_state}/#{resource_id} "]
ret = ["#{obj} #{resource_id}/#{state}/#{lcm_state} ",
"STATE #{obj}/#{state}/#{lcm_state}/#{resource_id} "]
ret << "SERVICE #{service_id} " if service_id
ret
else
['']
end