1
0
mirror of https://github.com/OpenNebula/one.git synced 2025-03-21 14:50:08 +03:00

B #5814: OneFlow server creates clients as needed

Oneflow do not caches OpenNebulaClient objects and generate a new one
whenever an user operation needs to be made.

* onflow-server sends the @username down to the flow components, so it
  can create serveradmin tokens when needed in callbacks. Athentication is
  performed in the flow-server

* Direct (non-cb) actions receive a fresh client each time to execute
  the operation
This commit is contained in:
Christian González 2022-10-05 11:46:28 +02:00 committed by Ruben S. Montero
parent 1260f92e05
commit 8980e1b0ff
No known key found for this signature in database
GPG Key ID: A0CEA6FA880A1D87
6 changed files with 593 additions and 648 deletions

View File

@ -101,7 +101,7 @@ class EventManager
# @param [Service] service the service
# @param [Role] the role which contains the VMs
# @param [Node] nodes the list of nodes (VMs) to wait for
def wait_deploy_action(client, service_id, role_name, nodes, report)
def wait_deploy_action(external_user, service_id, role_name, nodes, report)
if report
Log.info LOG_COMP, "Waiting #{nodes} to report ready"
rc = wait_report_ready(nodes)
@ -113,14 +113,14 @@ class EventManager
if rc[0]
@lcm.trigger_action(:deploy_cb,
service_id,
client,
external_user,
service_id,
role_name,
rc[1])
else
@lcm.trigger_action(:deploy_failure_cb,
service_id,
client,
external_user,
service_id,
role_name)
end
@ -128,10 +128,10 @@ class EventManager
# Wait for networks to e ready
#
# @param client [OpenNebula::Client] Client to perform requests
# @param service_id [Integer] Service ID
# @param networks [Array] Network IDs to wait until ready
def wait_deploy_nets_action(client, service_id, networks)
# @param external_user [String] External user to impersonate for performing the action
# @param service_id [Integer] Service ID
# @param networks [Array] Network IDs to wait until ready
def wait_deploy_nets_action(external_user, service_id, networks)
Log.info LOG_COMP, "Waiting networks #{networks} to be (READY)"
rc = wait_nets(networks, 'READY')
@ -141,15 +141,15 @@ class EventManager
action = :deploy_nets_failure_cb
end
@lcm.trigger_action(action, service_id, client, service_id)
@lcm.trigger_action(action, service_id, external_user, service_id)
end
# Wait for networks to e ready
#
# @param client [OpenNebula::Client] Client to perform requests
# @param external_user [String] External user to impersonate for performing the action
# @param service_id [Integer] Service ID
# @param networks [Array] Network IDs to wait until ready
def wait_undeploy_nets_action(client, service_id, networks)
def wait_undeploy_nets_action(external_user, service_id, networks)
Log.info LOG_COMP, "Waiting networks #{networks} to be (DONE)"
rc = wait_nets(networks, 'DONE')
@ -159,28 +159,29 @@ class EventManager
action = :undeploy_nets_failure_cb
end
@lcm.trigger_action(action, service_id, client, service_id)
@lcm.trigger_action(action, service_id, external_user, service_id)
end
# Wait for nodes to be in DONE
# @param [String] External user to impersonate for performing the action
# @param [service_id] the service id
# @param [role_name] the role name of the role which contains the VMs
# @param [nodes] the list of nodes (VMs) to wait for
def wait_undeploy_action(client, service_id, role_name, nodes)
def wait_undeploy_action(external_user, service_id, role_name, nodes)
Log.info LOG_COMP, "Waiting #{nodes} to be (DONE, LCM_INIT)"
rc = wait(nodes, 'DONE', 'LCM_INIT')
if rc[0]
@lcm.trigger_action(:undeploy_cb,
service_id,
client,
external_user,
service_id,
role_name,
rc[1])
else
@lcm.trigger_action(:undeploy_failure_cb,
service_id,
client,
external_user,
service_id,
role_name,
rc[1])
@ -189,11 +190,12 @@ class EventManager
# Wait for nodes to be in RUNNING if OneGate check required it will trigger
# another action after VMs are RUNNING
# @param [String] External user to impersonate for performing the action
# @param [Service] service the service
# @param [Role] the role which contains the VMs
# @param [Node] nodes the list of nodes (VMs) to wait for
# @param [Bool] up true if scalling up false otherwise
def wait_scaleup_action(client, service_id, role_name, nodes, report)
def wait_scaleup_action(external_user, service_id, role_name, nodes, report)
if report
Log.info LOG_COMP, "Waiting #{nodes} to report ready"
rc = wait_report_ready(nodes)
@ -205,20 +207,20 @@ class EventManager
if rc[0]
@lcm.trigger_action(:scaleup_cb,
service_id,
client,
external_user,
service_id,
role_name,
rc[1])
else
@lcm.trigger_action(:scaleup_failure_cb,
service_id,
client,
external_user,
service_id,
role_name)
end
end
def wait_scaledown_action(client, service_id, role_name, nodes)
def wait_scaledown_action(external_user, service_id, role_name, nodes)
Log.info LOG_COMP, "Waiting #{nodes} to be (DONE, LCM_INIT)"
rc = wait(nodes, 'DONE', 'LCM_INIT')
@ -226,21 +228,21 @@ class EventManager
if rc[0]
@lcm.trigger_action(:scaledown_cb,
service_id,
client,
external_user,
service_id,
role_name,
rc[1])
else
@lcm.trigger_action(:scaledown_failure_cb,
service_id,
client,
external_user,
service_id,
role_name,
rc[1])
end
end
def wait_add_action(client, service_id, role_name, nodes, report)
def wait_add_action(external_user, service_id, role_name, nodes, report)
if report
Log.info LOG_COMP, "Waiting #{nodes} to report ready"
rc = wait_report_ready(nodes)
@ -252,38 +254,39 @@ class EventManager
if rc[0]
@lcm.trigger_action(:add_cb,
service_id,
client,
external_user,
service_id,
role_name,
rc[1])
else
@lcm.trigger_action(:add_failure_cb,
service_id,
client,
external_user,
service_id,
role_name)
end
end
# Wait for nodes to be in DONE
# @param [String] External user to impersonate for performing the action
# @param [service_id] the service id
# @param [role_name] the role name of the role which contains the VMs
# @param [nodes] the list of nodes (VMs) to wait for
def wait_remove_action(client, service_id, role_name, nodes)
def wait_remove_action(external_user, service_id, role_name, nodes)
Log.info LOG_COMP, "Waiting #{nodes} to be (DONE, LCM_INIT)"
rc = wait(nodes, 'DONE', 'LCM_INIT')
if rc[0]
@lcm.trigger_action(:remove_cb,
service_id,
client,
external_user,
service_id,
role_name,
rc[1])
else
@lcm.trigger_action(:remove_failure_cb,
service_id,
client,
external_user,
service_id,
role_name,
rc[1])
@ -291,10 +294,11 @@ class EventManager
end
# Wait for nodes to be in DONE
# @param [String] External user to impersonate for performing the action
# @param [service_id] the service id
# @param [role_name] the role name of the role which contains the VMs
# @param [nodes] the list of nodes (VMs) to wait for
def wait_cooldown_action(client, service_id, role_name, cooldown_time)
def wait_cooldown_action(external_user, service_id, role_name, cooldown_time)
Log.info LOG_COMP, "Waiting #{cooldown_time}s for cooldown for " \
"service #{service_id} and role #{role_name}."
@ -302,32 +306,34 @@ class EventManager
@lcm.trigger_action(:cooldown_cb,
service_id,
client,
external_user,
service_id,
role_name)
end
# Wait for nodes to be in HOLD
# @param [String] External user to impersonate for performing the action
# @param [service_id] the service id
# @param [role_name] the role name of the role which contains the VMs
# @param [nodes] the list of nodes (VMs) to wait for
def wait_hold_action(client, service_id, role_name, nodes)
def wait_hold_action(external_user, service_id, role_name, nodes)
Log.info LOG_COMP, "Waiting #{nodes} to be (HOLD, LCM_INIT)"
wait(nodes, 'HOLD', 'LCM_INIT')
@lcm.trigger_action(:hold_cb,
service_id,
client,
external_user,
service_id,
role_name)
end
# Wait for nodes to be in RUNNING if OneGate check required it will trigger
# another action after VMs are RUNNING
# @param [String] External user to impersonate for performing the action
# @param [Service] service the service
# @param [Role] the role which contains the VMs
# @param [Node] nodes the list of nodes (VMs) to wait for
def wait_release_action(client, service_id, role_name, nodes, report)
def wait_release_action(external_user, service_id, role_name, nodes, report)
if report
Log.info LOG_COMP, "Waiting #{nodes} to report ready"
rc = wait_report_ready(nodes)
@ -339,14 +345,14 @@ class EventManager
if rc[0]
@lcm.trigger_action(:release_cb,
service_id,
client,
external_user,
service_id,
role_name,
rc[1])
else
@lcm.trigger_action(:deploy_failure_cb,
service_id,
client,
external_user,
service_id,
role_name)
end

File diff suppressed because it is too large Load Diff

View File

@ -37,7 +37,7 @@ class ServiceAutoScaler
loop do
@srv_pool.info
vm_pool = VirtualMachinePool.new(client)
vm_pool = VirtualMachinePool.new(@cloud_auth.client)
# -2 to get all resources, 0 just to get last record
monitoring = vm_pool.monitoring_xml(-2, 0)
@ -68,12 +68,6 @@ class ServiceAutoScaler
private
# Get OpenNebula client
def client
# If not, get one via cloud_auth
@cloud_auth.client
end
# If a role needs to scale, its cardinality is updated, and its state is set
# to SCALING. Only one role is set to scale.
#
@ -92,7 +86,7 @@ class ServiceAutoScaler
# if diff is zero, cooldown doesn't matter
cooldown_duration = nil if diff == 0
@lcm.update_role_policies(client,
@lcm.update_role_policies(nil,
service.id,
name,
policies,
@ -106,7 +100,7 @@ class ServiceAutoScaler
Log.info LOG_COMP,
"Role #{name} needs to scale #{diff} nodes", service.id
@lcm.scale_action(client,
@lcm.scale_action(nil,
service.id,
name,
role.cardinality + diff,

View File

@ -67,7 +67,7 @@ class ServiceWD
service_pool.info_all
# check that all nodes are in RUNNING state, if not, notify
check_roles_state(client, service_pool)
check_roles_state(service_pool)
# subscribe to all nodes
subscriber = gen_subscriber
@ -117,7 +117,7 @@ class ServiceWD
states[:state] = state
states[:lcm] = lcm_state
check_role_state(client, service_id, role_name, node, states)
check_role_state(service_id, role_name, node, states)
end
end
@ -141,7 +141,7 @@ class ServiceWD
service.roles.each do |name, role|
role.nodes_ids.each do |node|
check_role_state(client, service.id, name, node)
check_role_state(service.id, name, node)
end
end
end
@ -157,12 +157,6 @@ class ServiceWD
private
# Get OpenNebula client
def client
# If not, get one via cloud_auth
@cloud_auth.client
end
# Get ZMQ subscriber object
def gen_subscriber
subscriber = @context.socket(ZMQ::SUB)
@ -190,9 +184,8 @@ class ServiceWD
# Check service roles state
#
# @param client [OpenNebula::Client] Client to make API calls
# @param service_pool [ServicePool] All services to check
def check_roles_state(client, service_pool)
def check_roles_state(service_pool)
service_pool.each do |service|
service.info
@ -218,14 +211,14 @@ class ServiceWD
# If there are no VM, the role should be running
@lcm.trigger_action(:running_wd_cb,
service.id,
client,
nil,
service.id,
name,
[])
end
nodes_ids.each do |node|
check_role_state(client, service.id, name, node)
check_role_state(service.id, name, node)
end
end
end
@ -237,15 +230,14 @@ class ServiceWD
# Check role state
#
# @param client [OpenNebula::Client] Client to make API calls
# @param service_id [Integer] Service ID to check
# @param role_name [String] Role to check
# @param node [Integer] VM ID
# @param states [Hash] node state and node lcm state
def check_role_state(client, service_id, role_name, node, states = nil)
def check_role_state(service_id, role_name, node, states = nil)
# if don't have the state, query it by creating a VM object
if states.nil?
vm = OpenNebula::VirtualMachine.new_with_id(node, client)
vm = OpenNebula::VirtualMachine.new_with_id(node, @cloud_auth.client)
vm.info
vm_state = OpenNebula::VirtualMachine::VM_STATE[vm.state]
@ -270,7 +262,7 @@ class ServiceWD
# execute callback
@lcm.trigger_action(action,
service_id,
client,
nil,
service_id,
role_name,
node)

View File

@ -151,10 +151,9 @@ before do
auth = Rack::Auth::Basic::Request.new(request.env)
if auth.provided? && auth.basic?
username, password = auth.credentials
@username = cloud_auth.auth(request.env)
@client = OpenNebula::Client.new("#{username}:#{password}",
conf[:one_xmlrpc])
error 401, 'Invalid credentials' if @username.nil?
else
error 401, 'A username and password must be provided'
end
@ -211,7 +210,7 @@ GENERAL_EC = 500 # general error
##############################################################################
# TODO: make thread number configurable?
lcm = ServiceLCM.new(@client, conf[:concurrency], cloud_auth, conf[:retries])
lcm = ServiceLCM.new(conf[:concurrency], cloud_auth, conf[:retries])
##############################################################################
# Service
@ -219,7 +218,7 @@ lcm = ServiceLCM.new(@client, conf[:concurrency], cloud_auth, conf[:retries])
get '/service' do
# Read-only object
service_pool = OpenNebula::ServicePool.new(nil, @client)
service_pool = OpenNebula::ServicePool.new(nil, cloud_auth.client(@username))
rc = service_pool.info
if OpenNebula.is_error?(rc)
@ -232,7 +231,7 @@ get '/service' do
end
get '/service/:id' do
service = Service.new_with_id(params[:id], @client)
service = Service.new_with_id(params[:id], cloud_auth.client(@username))
rc = service.info
if OpenNebula.is_error?(rc)
@ -246,7 +245,7 @@ end
delete '/service/:id' do
# Read-only object
service = OpenNebula::Service.new_with_id(params[:id], @client)
service = OpenNebula::Service.new_with_id(params[:id], cloud_auth.client(@username))
rc = service.info
if OpenNebula.is_error?(rc)
@ -254,7 +253,7 @@ delete '/service/:id' do
end
# Starts service undeploying async
rc = lcm.undeploy_action(@client, service.id)
rc = lcm.undeploy_action(@username, service.id)
if OpenNebula.is_error?(rc)
return internal_error(rc.message, one_error_to_http(rc.errno))
@ -270,16 +269,16 @@ post '/service/:id/action' do
case action['perform']
when 'recover'
if opts && opts['delete']
rc = lcm.recover_action(@client, params[:id], true)
rc = lcm.recover_action(@username, params[:id], true)
else
rc = lcm.recover_action(@client, params[:id])
rc = lcm.recover_action(@username, params[:id])
end
when 'chown'
if opts && opts['owner_id']
u_id = opts['owner_id'].to_i
g_id = (opts['group_id'] || -1).to_i
rc = lcm.chown_action(@client, params[:id], u_id, g_id)
rc = lcm.chown_action(@username, params[:id], u_id, g_id)
else
rc = OpenNebula::Error.new("Action #{action['perform']}: " \
'You have to specify a UID')
@ -288,34 +287,34 @@ post '/service/:id/action' do
if opts && opts['group_id']
g_id = opts['group_id'].to_i
rc = lcm.chown_action(@client, params[:id], -1, g_id)
rc = lcm.chown_action(@username, params[:id], -1, g_id)
else
rc = OpenNebula::Error.new("Action #{action['perform']}: " \
'You have to specify a GID')
end
when 'chmod'
if opts && opts['octet']
rc = lcm.chmod_action(@client, params[:id], opts['octet'])
rc = lcm.chmod_action(@username, params[:id], opts['octet'])
else
rc = OpenNebula::Error.new("Action #{action['perform']}: " \
'You have to specify an OCTET')
end
when 'rename'
if opts && opts['name']
rc = lcm.rename_action(@client, params[:id], opts['name'])
rc = lcm.rename_action(@username, params[:id], opts['name'])
else
rc = OpenNebula::Error.new("Action #{action['perform']}: " \
'You have to specify a name')
end
when 'release'
rc = lcm.release_action(@client, params[:id])
rc = lcm.release_action(@username, params[:id])
when *Role::SCHEDULE_ACTIONS
# Use defaults only if one of the options is supplied
opts['period'] ||= conf[:action_period]
opts['number'] ||= conf[:action_number]
rc = lcm.service_sched_action(@client,
rc = lcm.service_sched_action(@username,
params[:id],
action['perform'],
opts['period'],
@ -368,7 +367,7 @@ put '/service/:id' do
return internal_error(e.message, VALIDATION_EC)
end
rc = lcm.service_update(@client, params[:id], new_template, append)
rc = lcm.service_update(@username, params[:id], new_template, append)
if OpenNebula.is_error?(rc)
return internal_error(rc.message, one_error_to_http(rc.errno))
@ -409,7 +408,7 @@ post '/service/:id/role/:role_name/action' do
opts['period'] ||= conf[:action_period]
opts['number'] ||= conf[:action_number]
rc = lcm.sched_action(@client,
rc = lcm.sched_action(@username,
params[:id],
params[:role_name],
action['perform'],
@ -427,7 +426,7 @@ end
post '/service/:id/scale' do
call_body = JSON.parse(request.body.read)
rc = lcm.scale_action(@client,
rc = lcm.scale_action(@username,
params[:id],
call_body['role_name'],
call_body['cardinality'].to_i,
@ -456,9 +455,9 @@ post '/service/:id/role_action' do
return internal_error(e.message, VALIDATION_EC)
end
rc = lcm.add_role_action(@client, params[:id], json_template)
rc = lcm.add_role_action(@username, params[:id], json_template)
when 'remove_role'
rc = lcm.remove_role_action(@client, params[:id], opts['role'])
rc = lcm.remove_role_action(@username, params[:id], opts['role'])
else
rc = OpenNebula::Error.new(
"Action #{action['perform']} not supported"
@ -477,7 +476,7 @@ end
##############################################################################
post '/service_pool/purge_done' do
service_pool = OpenNebula::ServicePool.new(nil, @client)
service_pool = OpenNebula::ServicePool.new(nil, cloud_auth.client(@username))
rc = service_pool.info
if OpenNebula.is_error?(rc)
@ -488,6 +487,7 @@ post '/service_pool/purge_done' do
service_pool.each_page(conf[:page_size]) do |service|
next unless service.state == Service::STATE['DONE']
# TODO: What if token expires in the middle?
service.delete
end
end
@ -500,8 +500,10 @@ end
##############################################################################
get '/service_template' do
s_template_pool = OpenNebula::ServiceTemplatePool
.new(@client, OpenNebula::Pool::INFO_ALL)
s_template_pool = OpenNebula::ServiceTemplatePool.new(
cloud_auth.client(@username),
OpenNebula::Pool::INFO_ALL
)
rc = s_template_pool.info
if OpenNebula.is_error?(rc)
@ -514,8 +516,10 @@ get '/service_template' do
end
get '/service_template/:id' do
service_template = OpenNebula::ServiceTemplate.new_with_id(params[:id],
@client)
service_template = OpenNebula::ServiceTemplate.new_with_id(
params[:id],
cloud_auth.client(@username)
)
rc = service_template.info
if OpenNebula.is_error?(rc)
@ -528,8 +532,10 @@ get '/service_template/:id' do
end
delete '/service_template/:id' do
service_template = OpenNebula::ServiceTemplate.new_with_id(params[:id],
@client)
service_template = OpenNebula::ServiceTemplate.new_with_id(
params[:id],
cloud_auth.client(@username)
)
begin
delete_type = JSON.parse(request.body.read)['delete_type']
rescue StandardError
@ -546,8 +552,11 @@ delete '/service_template/:id' do
end
put '/service_template/:id' do
service_template = OpenNebula::ServiceTemplate.new_with_id(params[:id],
@client)
service_template = OpenNebula::ServiceTemplate.new_with_id(
params[:id],
cloud_auth.client(@username)
)
rc = nil
begin
@ -574,7 +583,8 @@ end
post '/service_template' do
xml = OpenNebula::ServiceTemplate.build_xml
s_template = OpenNebula::ServiceTemplate.new(xml, @client)
s_template = OpenNebula::ServiceTemplate.new(xml,
cloud_auth.client(@username))
begin
rc = s_template.allocate(request.body.read)
@ -595,8 +605,11 @@ post '/service_template' do
end
post '/service_template/:id/action' do
service_template = OpenNebula::ServiceTemplate.new_with_id(params[:id],
@client)
service_template = OpenNebula::ServiceTemplate.new_with_id(
params[:id],
cloud_auth.client(@username)
)
action = JSON.parse(request.body.read)['action']
opts = action['params']
opts = {} if opts.nil?
@ -717,7 +730,7 @@ post '/service_template/:id/action' do
return internal_error(service.message, GENERAL_EC)
else
# Starts service deployment async
rc = lcm.deploy_nets_action(@client, service.id)
rc = lcm.deploy_nets_action(@username, service.id)
if OpenNebula.is_error?(rc)
return internal_error(rc.message, one_error_to_http(rc.errno))
@ -790,9 +803,12 @@ post '/service_template/:id/action' do
return internal_error(rc.message, GENERAL_EC)
end
new_stemplate = OpenNebula::ServiceTemplate.new_with_id(rc, @client)
new_stemplate = OpenNebula::ServiceTemplate.new_with_id(
rc,
cloud_auth.client(@username)
)
rc = new_stemplate.info
rc = new_stemplate.info
if OpenNebula.is_error?(rc)
return internal_error(rc.message, GENERAL_EC)

View File

@ -66,12 +66,16 @@ module OpenNebula
super('DOCUMENT_POOL', 'DOCUMENT', @client)
end
def client
def client(user_name = nil)
# If there's a client defined use it
return @client unless @client.nil?
# If not, get one via cloud_auth
@cloud_auth.client
if user_name.nil?
@cloud_auth.client
else
@cloud_auth.client(user_name)
end
end
def info
@ -129,14 +133,17 @@ module OpenNebula
# The mutex will be unlocked after the block execution.
#
# @return [Service, OpenNebula::Error] The Service in case of success
def get(service_id, external_client = nil, &block)
def get(service_id, external_user = nil, &block)
service_id = service_id.to_i if service_id
aux_client = nil
if external_client.nil?
# WARNING!!!
# No validation will be performed for external_user, the credentials
# for this user must be validated previously.
if external_user.nil?
aux_client = client
else
aux_client = external_client
aux_client = client(external_user)
end
service = Service.new_with_id(service_id, aux_client)
@ -172,7 +179,7 @@ module OpenNebula
return rc
end
block.call(service)
block.call(service, client)
end
@@mutex.synchronize do