1
0
mirror of https://github.com/OpenNebula/one.git synced 2024-12-22 13:33:52 +03:00

F #5191: implement OneFlow service purge done (#569)

This commit is contained in:
Alejandro Huertas Herrero 2020-12-18 13:32:49 +01:00 committed by GitHub
parent 5d066ea8f4
commit 58b79f114e
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
8 changed files with 146 additions and 75 deletions

View File

@ -55,12 +55,6 @@ CommandParser::CmdParser.new(ARGV) do
set :option, CommandParser::VERSION
set :option, CommandParser::HELP
DONE = {
:name => 'done',
:large => '--done',
:description => 'Show services in DONE state'
}
ARGS = {
:name => 'args',
:large => '--args arg1,arg2',
@ -114,7 +108,7 @@ CommandParser::CmdParser.new(ARGV) do
List the available services
EOT
command :list, list_desc, :options => OpenNebulaHelper::FORMAT + [DONE] do
command :list, list_desc, :options => OpenNebulaHelper::FORMAT do
helper.list_service_pool(helper.client(options), options)
end
@ -124,7 +118,7 @@ CommandParser::CmdParser.new(ARGV) do
Top the available services
EOT
command :top, top_desc, :options => [CLIHelper::DELAY, DONE] do
command :top, top_desc, :options => [CLIHelper::DELAY] do
Signal.trap('INT') { exit(-1) }
helper.top_service_pool(helper.client(options), options)
@ -403,4 +397,21 @@ CommandParser::CmdParser.new(ARGV) do
0
end
end
###
purge_done_desc = <<-EOT.unindent
Purge and delete services in DONE state
EOT
command :'purge-done', purge_done_desc do
client = helper.client(options)
response = client.post('/service_pool/purge_done', '')
if CloudClient.is_error?(response)
exit_with_code response.code.to_i, response.to_s
else
0
end
end
end

View File

@ -66,6 +66,10 @@
:vm_name_template: '$ROLE_NAME_$VM_NUMBER_(service_$SERVICE_ID)'
#:vn_name_template: '$ROLE_NAME(service_$SERVICE_ID)'
# Default page size when purging DONE services
:page_size: 10
#############################################################
# Auth
#############################################################

View File

@ -267,7 +267,9 @@ class EventManager
rc = subscriber.recv_string(key)
rc = subscriber.recv_string(content) if rc != -1
# rubocop:disable Style/GuardClause
if rc == -1 && ZMQ::Util.errno != ZMQ::EAGAIN
# rubocop:enable Style/GuardClause
next Log.error LOG_COMP, 'Error reading from subscriber.'
elsif rc == -1
Log.info LOG_COMP, "Timeout reached for VM #{nodes} =>"\
@ -294,19 +296,11 @@ class EventManager
return [true, rc_nodes] # (nodes.empty? && fail_nodes.empty?)
end
id = retrieve_id(key)
rc = check_nodes([id], state, lcm_state, subscriber)
states = { :state => state, :lcm_state => lcm_state }
rc = check_nodes_event(nodes, states, key, content, subscriber)
rc_nodes[:successful].merge!(rc[:successful])
rc_nodes[:failure].merge!(rc[:failure])
nodes.delete(id)
unsubscribe(id, state, lcm_state, subscriber)
(SUBSCRIBE_STATES + ['DONE']).each do |s|
unsubscribe(id, s, 'LCM_INIT', subscriber)
end
end
subscriber.close
@ -348,6 +342,7 @@ class EventManager
# rubocop:disable Style/GuardClause
if rc == -1 && ZMQ::Util.errno != ZMQ::EAGAIN
# rubocop:enable Style/GuardClause
next Log.error LOG_COMP, 'Error reading from subscriber.'
elsif rc == -1
Log.info LOG_COMP, "Timeout reached for VM #{nodes} to report"
@ -374,29 +369,46 @@ class EventManager
return [true, rc_nodes] # (nodes.empty? && fail_nodes.empty?)
end
# rubocop:enable Style/GuardClause
rc = check_nodes_event(nodes, {}, key, content, subscriber)
id = retrieve_id(key)
rc = check_nodes_report([id])
rc_nodes[:successful].merge!(rc[:successful])
rc_nodes[:failure].merge!(rc[:failure])
unless rc[:successful].empty?
nodes.delete(id)
(SUBSCRIBE_STATES + ['DONE']).each do |s|
unsubscribe(id, s, 'LCM_INIT', subscriber)
end
if rc.is_a? Array
next if !rc[1].match('READY=YES') || !nodes.include?(rc[0])
else
rc_nodes[:successful].merge!(rc[:successful])
rc_nodes[:failure].merge!(rc[:failure])
next
end
xml = Nokogiri::XML(Base64.decode64(content))
Log.info LOG_COMP, "Node #{rc[0]} reported ready"
id = xml.xpath(
(SUBSCRIBE_STATES + ['DONE']).each do |s|
unsubscribe(rc[0], s, 'LCM_INIT', subscriber)
end
nodes.delete(rc[0])
rc_nodes[:successful][rc[0]] = false
end
subscriber.setsockopt(ZMQ::UNSUBSCRIBE, 'EVENT API one.vm.update 1')
subscriber.close
[true, rc_nodes]
end
def check_nodes_event(nodes, states, key, content, subscriber)
xml = Nokogiri::XML(Base64.decode64(content))
# Read states we are waiting for
state = states[:state]
lcm_state = states[:lcm_state]
if key.include?('one.vm.update')
id = Integer(xml.xpath(
'//PARAMETER[POSITION=2 and TYPE=\'IN\']/VALUE'
).text.to_i
).text)
ready = xml.xpath(
'//PARAMETER[POSITION=3 and TYPE=\'IN\']/VALUE'
).text
@ -408,23 +420,42 @@ class EventManager
ready.gsub!(' ', '')
# rubocop:enable Style/StringLiterals
next if !ready.match('READY=YES') || !nodes.include?(id)
[id, ready]
else
# Read information from hook message
id = retrieve_id(key)
h_state = xml.xpath('//HOOK_MESSAGE/STATE').text
h_lcm_state = xml.xpath('//HOOK_MESSAGE/LCM_STATE').text
Log.info LOG_COMP, "Node #{id} reported ready"
rc_nodes = { :successful => {}, :failure => {} }
(SUBSCRIBE_STATES + ['DONE']).each do |s|
unsubscribe(id, s, 'LCM_INIT', subscriber)
Log.info LOG_COMP,
"Node #{id} reached (#{h_state}, #{h_lcm_state})"
if h_state == 'DONE'
rc_nodes[:successful][id] = true
elsif h_state == state && h_lcm_state == lcm_state
rc_nodes[:successful][id] = false
elsif SUBSCRIBE_STATES.include?(vm_state)
rc_nodes[:successful][id] = false
end
nodes.delete(id)
rc_nodes[:successful][id] = false
if FAILURE_STATES.include? h_lcm_state
Log.error LOG_COMP, "Node #{id} is in FAILURE state"
rc_nodes[:failure][id] = false
else
unsubscribe(id, state, lcm_state, subscriber)
(SUBSCRIBE_STATES + ['DONE']).each do |s|
unsubscribe(id, s, 'LCM_INIT', subscriber)
end
nodes.delete(id)
end
rc_nodes
end
subscriber.setsockopt(ZMQ::UNSUBSCRIBE, 'EVENT API one.vm.update 1')
subscriber.close
[true, rc_nodes]
end
def check_nodes(nodes, state, lcm_state, subscriber)

View File

@ -295,10 +295,7 @@ class ServiceLCM
# If shutdown roles is empty, asume the service is in DONE and exit
if roles.empty?
if service.all_roles_done?
service.set_state(Service::STATE['DONE'])
service.update
end
service.delete
break
end
@ -594,7 +591,9 @@ class ServiceLCM
"Virtual Networks #{rc}"
end
service.set_state(Service::STATE['DONE'])
service.delete
break
elsif service.strategy == 'straight'
set_deploy_strategy(service)

View File

@ -300,28 +300,6 @@ module OpenNebula
end
end
# Delete the service. All the VMs are also deleted from OpenNebula.
# @return [nil, OpenNebula::Error] nil in case of success, Error
# otherwise
def delete
networks = JSON.parse(self['TEMPLATE/BODY'])['networks_values']
networks.each do |net|
next unless net[net.keys[0]].key? 'template_id'
net_id = net[net.keys[0]]['id'].to_i
rc = OpenNebula::VirtualNetwork
.new_with_id(net_id, @client).delete
if OpenNebula.is_error?(rc)
log_info("Error deleting vnet #{net_id}: #{rc}")
end
end if networks
super()
end
# Retrieves the information of the Service and all its Nodes.
#
# @return [nil, OpenNebula::Error] nil in case of success, Error

View File

@ -83,6 +83,7 @@ conf[:vm_name_template] ||= DEFAULT_VM_NAME_TEMPLATE
conf[:wait_timeout] ||= 30
conf[:concurrency] ||= 10
conf[:auth] = 'opennebula'
conf[:page_size] ||= 10
set :bind, conf[:host]
set :port, conf[:port]
@ -388,6 +389,29 @@ post '/service/:id/scale' do
status 204
end
##############################################################################
# Service Pool
##############################################################################
post '/service_pool/purge_done' do
service_pool = OpenNebula::ServicePool.new(nil, @client)
rc = service_pool.info
if OpenNebula.is_error?(rc)
return internal_error(rc.message, one_error_to_http(rc.errno))
end
Thread.new do
service_pool.each_page(conf[:page_size]) do |service|
next unless service.state == Service::STATE['DONE']
service.delete
end
end
status 204
end
##############################################################################
# Service Template
##############################################################################

View File

@ -34,7 +34,7 @@ module OpenNebula
end
# ServicePool class
class ServicePool
class ServicePool < Pool
# rubocop:disable Style/ClassVars
@@mutex = Mutex.new
@ -53,6 +53,13 @@ module OpenNebula
@cloud_auth = cloud_auth
@client = client
@one_pool = nil
if @client
info = Nokogiri::XML(@client.call('user.info', -1))
@user_id = Integer(info.xpath('/USER/ID').text)
end
super('DOCUMENT_POOL', 'DOCUMENT', @client)
end
def client
@ -93,6 +100,23 @@ module OpenNebula
@one_pool.each(&block)
end
# Iterates over pool pages
# size:: nil => default page size
# > 0 => page size
# state state of objects
def each_page(size)
loop_page(size, Service::DOCUMENT_TYPE, false) do |element, page|
page.each("//#{element}") do |obj|
service = Service.new_with_id(obj['ID'], @client)
service.info
yield(service)
end
size
end
end
# Retrieves a Service element from OpenNebula. The Service::info()
# method is called
#

View File

@ -28,7 +28,7 @@ module OpenNebula
attr_reader :element_name
PAGINATED_POOLS=%w{VM_POOL IMAGE_POOL TEMPLATE_POOL VN_POOL
SECGROUP_POOL}
SECGROUP_POOL DOCUMENT_POOL}
protected
#pool:: _String_ XML name of the root element