diff --git a/src/vmm_mad/ec2/one_vmm_ec2.rb b/src/vmm_mad/ec2/one_vmm_ec2.rb index cd7b36b17b..a4ce862b05 100755 --- a/src/vmm_mad/ec2/one_vmm_ec2.rb +++ b/src/vmm_mad/ec2/one_vmm_ec2.rb @@ -38,9 +38,9 @@ end $: << RUBY_LIB_LOCATION -require 'pp' require "VirtualMachineDriver" require "CommandManager" +require 'scripts_common' require "rexml/document" # The main class for the EC2 driver @@ -48,27 +48,113 @@ class EC2Driver < VirtualMachineDriver # EC2 commands constants EC2 = { - :run => "#{EC2_LOCATION}/bin/ec2-run-instances", - :terminate => "#{EC2_LOCATION}/bin/ec2-terminate-instances", - :describe => "#{EC2_LOCATION}/bin/ec2-describe-instances", - :associate => "#{EC2_LOCATION}/bin/ec2-associate-address", - :reboot => "#{EC2_LOCATION}/bin/ec2-reboot-instances", - :authorize => "#{EC2_LOCATION}/bin/ec2-authorize" + :run => { + :cmd => "#{EC2_LOCATION}/bin/ec2-run-instances", + :args => { + "AKI" => { + :opt => '--kernel' + }, + "AMI" => { + :opt => '' + }, + "BLOCKDEVICEMAPPING" => { + :opt => '-b' + }, + "CLIENTTOKEN" => { + :opt => '--client-token' + }, + "INSTANCETYPE" => { + :opt => '-t' + }, + "KEYPAIR" => { + :opt => '-k' + }, + "LICENSEPOOL" => { + :opt => '--license-pool' + }, + "PLACEMENTGROUP" => { + :opt => '--placement-group' + }, + "PRIVATEIP" => { + :opt => '--private-ip-address' + }, + "RAMDISK" => { + :opt => '--ramdisk' + }, + "SUBNETID" => { + :opt => '-s' + }, + "TENANCY" => { + :opt => '--tenancy' + }, + "USERDATA" => { + :opt => '-d' + }, + "USERDATAFILE" => { + :opt => '-f' + }, + "SECURITYGROUPS" => { + :opt => '-g', + :proc => lambda {|str| str.split(',').join(' -g ')} + } + } + }, + :terminate => { + :cmd => "#{EC2_LOCATION}/bin/ec2-terminate-instances" + }, + :describe => { + :cmd => "#{EC2_LOCATION}/bin/ec2-describe-instances" + }, + :associate => { + :cmd => "#{EC2_LOCATION}/bin/ec2-associate-address", + :args => { + "SUBNETID" => { + :opt => '-a', + :proc => lambda {|str| ''} + }, + "ELASTICIP" => { + :opt => '' + } + } + }, + :authorize => { + :cmd => "#{EC2_LOCATION}/bin/ec2-authorize", + :args => { + "AUTHORIZEDPORTS" => { + :opt => '-p', + :proc => lambda {|str| str.split(',').join(' -p ')} + } + } + }, + :reboot => { + :cmd => "#{EC2_LOCATION}/bin/ec2-reboot-instances" + }, + :stop => { + :cmd => "#{EC2_LOCATION}/bin/ec2-stop-instances" + }, + :start => { + :cmd => "#{EC2_LOCATION}/bin/ec2-start-instances" + }, + :tags => { + :cmd => "#{EC2_LOCATION}/bin/ec2-create-tags", + :args => { + "TAGS" => { + :opt => '-t', + :proc => lambda {|str| str.split(',').join(' -t ')} + } + } + } } # EC2 constructor, loads defaults for the EC2Driver def initialize(ec2_conf = nil) - if !EC2_JVM_CONCURRENCY concurrency = 5 else concurrency = EC2_JVM_CONCURRENCY.to_i end - super('', - :concurrency => concurrency, - :threaded => true - ) + super('', :concurrency => concurrency, :threaded => true) @defaults = Hash.new @@ -83,18 +169,127 @@ class EC2Driver < VirtualMachineDriver return if !ec2 - @defaults["KEYPAIR"] = ec2_value(ec2,"KEYPAIR") - @defaults["AUTHORIZEDPORTS"] = ec2_value(ec2,"AUTHORIZEDPORTS") - @defaults["INSTANCETYPE"] = ec2_value(ec2,"INSTANCETYPE") + EC2.each {|action, hash| + if hash[:args] + hash[:args].each { |key, value| + @defaults[key] = value_from_xml(ec2, key) + } + end + } end end # DEPLOY action, also sets ports and ip if needed def deploy(id, drv_message) + ec2_info = get_deployment_info(drv_message) + return unless ec2_info + + if !ec2_value(ec2_info, 'AMI') + msg = "Can not find AMI in deployment file" + send_message(ACTION[:deploy], RESULT[:failure], id, msg) + return + end + + deploy_exe = exec_and_log_ec2(:run, ec2_info, id) + if deploy_exe.code != 0 + msg = deploy_exe.stderr + send_message(ACTION[:deploy], RESULT[:failure], id, msg) + return + end + + if !deploy_exe.stdout.match(/^INSTANCE\s*(.+?)\s/) + msg = "Could not find instance id. Check ec2-describe-instances" + send_message(ACTION[:deploy], RESULT[:failure], id, msg) + return + end + + deploy_id = $1 + + if ec2_value(ec2_info, 'AUTHORIZEDPORTS') + exec_and_log_ec2(:authorize, ec2_info, 'default', id) + end + + if ec2_value(ec2_info, 'TAGS') + exec_and_log_ec2(:tags, ec2_info, deploy_id, id) + end + + if ec2_value(ec2_info, 'ELASTICIP') + exec_and_log_ec2(:associate, ec2_info, "-i #{deploy_id}", id) + end + + send_message(ACTION[:deploy], RESULT[:success], id, deploy_id) + end + + # Shutdown a EC2 instance + def shutdown(id, drv_message) + ec2_action(drv_message, :terminate, ACTION[:shutdown], id) + end + + # Reboot a EC2 instance + def reboot(id, drv_message) + ec2_action(drv_message, :reboot, ACTION[:reboot], id) + end + + # Cancel a EC2 instance + def cancel(id, drv_message) + ec2_action(drv_message, :terminate, ACTION[:cancel], id) + end + + # Stop a EC2 instance + def save(id, drv_message) + ec2_action(drv_message, :stop, ACTION[:save], id) + end + + # Cancel a EC2 instance + def restore(id, drv_message) + ec2_action(drv_message, :start, ACTION[:restor], id) + end + + # Get info (IP, and state) for a EC2 instance + def poll(id, drv_message) + msg = decode(drv_message) + + deploy_id = msg.elements["DEPLOY_ID"].text + + info = "#{POLL_ATTRIBUTE[:usedmemory]}=0 " \ + "#{POLL_ATTRIBUTE[:usedcpu]}=0 " \ + "#{POLL_ATTRIBUTE[:nettx]}=0 " \ + "#{POLL_ATTRIBUTE[:netrx]}=0" + + + exe = exec_and_log_ec2(:describe, nil, deploy_id, id) + if exe.code != 0 + send_message(ACTION[:poll], RESULT[:failure], id, exe.stderr) + return + end + + exe.stdout.match(Regexp.new("INSTANCE\\s+#{deploy_id}\\s+(.+)")) + + if !$1 + info << " #{POLL_ATTRIBUTE[:state]}=#{VM_STATE[:deleted]}" + else + monitor_data = $1.split(/\s+/) + + case monitor_data[3] + when "pending" + info << " #{POLL_ATTRIBUTE[:state]}=#{VM_STATE[:active]}" + when "running" + info<<" #{POLL_ATTRIBUTE[:state]}=#{VM_STATE[:active]}"<< + " IP=#{monitor_data[1]}" + when "shutting-down","terminated" + info << " #{POLL_ATTRIBUTE[:state]}=#{VM_STATE[:deleted]}" + end + end + + send_message(ACTION[:poll], RESULT[:success], id, info) + end + +private + + def get_deployment_info(drv_message) msg = decode(drv_message) host = msg.elements["HOST"].text - local_dfile = msg.elements["LOCAL_DEPLOYMENT_FILE"].text if !local_dfile @@ -134,150 +329,67 @@ class EC2Driver < VirtualMachineDriver end end - ami = ec2_value(ec2,"AMI") - keypair = ec2_value(ec2,"KEYPAIR") - eip = ec2_value(ec2,"ELASTICIP") - ports = ec2_value(ec2,"AUTHORIZEDPORTS") - type = ec2_value(ec2,"INSTANCETYPE") - - if !ami - send_message(ACTION[:deploy],RESULT[:failure],id, - "Can not find AMI in deployment file #{local_dfile}") - return - end - - deploy_cmd = "#{EC2[:run]} #{ami}" - deploy_cmd << " -k #{keypair}" if keypair - deploy_cmd << " -t #{type}" if type - - deploy_exe = LocalCommand.run(deploy_cmd, log_method(id)) - - if deploy_exe.code != 0 - send_message(ACTION[:deploy],RESULT[:failure],id) - return - end - - if !deploy_exe.stdout.match(/^INSTANCE\s*(.+?)\s/) - send_message(ACTION[:deploy],RESULT[:failure],id, - "Could not find instance id. Check ec2-describe-instances") - return - end - - deploy_id = $1 - - if eip - ip_cmd = "#{EC2[:associate]} #{eip} -i #{deploy_id}" - ip_exe = LocalCommand.run(ip_cmd, log_method(id)) - end - - if ports - ports_cmd = "#{EC2[:authorize]} default -p #{ports}" - ports_exe = LocalCommand.run(ports_cmd, log_method(id)) - end - - send_message(ACTION[:deploy],RESULT[:success],id,deploy_id) + ec2 end - # Shutdown a EC2 instance - def shutdown(id, drv_message) + # Execute an EC2 command and send the SUCCESS or FAILURE signal + # +drv_message+: String, base64 encoded info sent by ONE + # +ec2_action+: Symbol, one of the keys of the EC2 hash constant (i.e :run) + # +one_action+: String, OpenNebula action + # +id+: String, action id + def ec2_action(drv_message, ec2_action, one_action, id) msg = decode(drv_message) - host = msg.elements["HOST"].text deploy_id = msg.elements["DEPLOY_ID"].text - ec2_terminate(ACTION[:shutdown], id, deploy_id) - end - - # Reboot a EC2 instance - def reboot(id, drv_message) - cmd = "#{EC2_LOCATION}/bin/ec2-reboot-instances #{deploy_id}" - exe = LocalCommand.run(cmd, log_method(id)) - + exe = exec_and_log_ec2(ec2_action, nil, deploy_id, id) if exe.code != 0 - result = RESULT[:failure] + send_message(one_action, RESULT[:failure], id, exe.stderr) else - result = RESULT[:success] + send_message(one_action, RESULT[:success], id) end - - send_message(action,result,id) end - # Cancel a EC2 instance - def cancel(id, drv_message) - msg = decode(drv_message) + # Execute an EC2 command and log the message if error + # This function will build the command joining the :cmd value of the EC2 + # hash, the extra_params string and the options built from the :args schema + # of the EC2 hash and the xml + # +action+: Symbol, one of the keys of the EC2 hash constant (i.e :run) + # +xml+: REXML Document, containing EC2 information + # +extra_params+: String, extra information to be added to the command + def exec_and_log_ec2(action, xml, extra_params="", id) + cmd = EC2[action][:cmd].clone + cmd << ' ' << extra_params << ' ' if extra_params - host = msg.elements["HOST"].text - deploy_id = msg.elements["DEPLOY_ID"].text - - ec2_terminate(ACTION[:cancel], id, deploy_id) - end - - # Get info (IP, and state) for a EC2 instance - def poll(id, drv_message) - msg = decode(drv_message) - - host = msg.elements["HOST"].text - deploy_id = msg.elements["DEPLOY_ID"].text - - info = "#{POLL_ATTRIBUTE[:usedmemory]}=0 " \ - "#{POLL_ATTRIBUTE[:usedcpu]}=0 " \ - "#{POLL_ATTRIBUTE[:nettx]}=0 " \ - "#{POLL_ATTRIBUTE[:netrx]}=0" - - cmd = "#{EC2[:describe]} #{deploy_id}" - exe = LocalCommand.run(cmd, log_method(id)) - - if exe.code != 0 - send_message(ACTION[:poll],RESULT[:failure],id) - return + if EC2[action][:args] + cmd << EC2[action][:args].map {|k,v| + str = ec2_value(xml, k, &v[:proc]) + v[:opt] + ' ' + str if str + }.join(' ') end - exe.stdout.match(Regexp.new("INSTANCE\\s+#{deploy_id}\\s+(.+)")) + LocalCommand.run(cmd, log_method(id)) + end - if !$1 - info << " #{POLL_ATTRIBUTE[:state]}=#{VM_STATE[:deleted]}" + # Returns the value of the xml specified by the name or the default + # one if it does not exist + # +xml+: REXML Document, containing EC2 information + # +name+: String, xpath expression to retrieve the value + # +block+: Block, block to be applied to the value before returning it + def ec2_value(xml, name, &block) + value = value_from_xml(xml, name) || @defaults[name] + if block_given? && value + block.call(value) else - monitor_data = $1.split(/\s+/) - - case monitor_data[3] - when "pending" - info << " #{POLL_ATTRIBUTE[:state]}=#{VM_STATE[:active]}" - when "running" - info<<" #{POLL_ATTRIBUTE[:state]}=#{VM_STATE[:active]}"<< - " IP=#{monitor_data[1]}" - when "shutting-down","terminated" - info << " #{POLL_ATTRIBUTE[:state]}=#{VM_STATE[:deleted]}" - end + value end - - send_message(ACTION[:poll], RESULT[:success], id, info) end -private - - def ec2_terminate(action, id, deploy_id) - cmd = "#{EC2_LOCATION}/bin/ec2-terminate-instances #{deploy_id}" - exe = LocalCommand.run(cmd, log_method(id)) - - if exe.code != 0 - result = RESULT[:failure] - else - result = RESULT[:success] + def value_from_xml(xml, name) + if xml + element = xml.elements[name] + element.text.strip if element && element.text end - - send_message(action,result,id) - end - - def ec2_value(xml,name) - value = nil - element = xml.elements[name] - value = element.text.strip if element && element.text - - if !value - value = @defaults[name] - end - - return value end end