diff --git a/src/vmm_mad/ec2/one_vmm_ec2.rb b/src/vmm_mad/ec2/one_vmm_ec2.rb index 3b851a835e..a4ce862b05 100755 --- a/src/vmm_mad/ec2/one_vmm_ec2.rb +++ b/src/vmm_mad/ec2/one_vmm_ec2.rb @@ -38,9 +38,9 @@ end $: << RUBY_LIB_LOCATION -require 'pp' require "VirtualMachineDriver" require "CommandManager" +require 'scripts_common' require "rexml/document" # The main class for the EC2 driver @@ -48,27 +48,113 @@ class EC2Driver < VirtualMachineDriver # EC2 commands constants EC2 = { - :run => "#{EC2_LOCATION}/bin/ec2-run-instances", - :terminate => "#{EC2_LOCATION}/bin/ec2-terminate-instances", - :describe => "#{EC2_LOCATION}/bin/ec2-describe-instances", - :associate => "#{EC2_LOCATION}/bin/ec2-associate-address", - :authorize => "#{EC2_LOCATION}/bin/ec2-authorize", - :tags => "#{EC2_LOCATION}/bin/ec2-create-tags" + :run => { + :cmd => "#{EC2_LOCATION}/bin/ec2-run-instances", + :args => { + "AKI" => { + :opt => '--kernel' + }, + "AMI" => { + :opt => '' + }, + "BLOCKDEVICEMAPPING" => { + :opt => '-b' + }, + "CLIENTTOKEN" => { + :opt => '--client-token' + }, + "INSTANCETYPE" => { + :opt => '-t' + }, + "KEYPAIR" => { + :opt => '-k' + }, + "LICENSEPOOL" => { + :opt => '--license-pool' + }, + "PLACEMENTGROUP" => { + :opt => '--placement-group' + }, + "PRIVATEIP" => { + :opt => '--private-ip-address' + }, + "RAMDISK" => { + :opt => '--ramdisk' + }, + "SUBNETID" => { + :opt => '-s' + }, + "TENANCY" => { + :opt => '--tenancy' + }, + "USERDATA" => { + :opt => '-d' + }, + "USERDATAFILE" => { + :opt => '-f' + }, + "SECURITYGROUPS" => { + :opt => '-g', + :proc => lambda {|str| str.split(',').join(' -g ')} + } + } + }, + :terminate => { + :cmd => "#{EC2_LOCATION}/bin/ec2-terminate-instances" + }, + :describe => { + :cmd => "#{EC2_LOCATION}/bin/ec2-describe-instances" + }, + :associate => { + :cmd => "#{EC2_LOCATION}/bin/ec2-associate-address", + :args => { + "SUBNETID" => { + :opt => '-a', + :proc => lambda {|str| ''} + }, + "ELASTICIP" => { + :opt => '' + } + } + }, + :authorize => { + :cmd => "#{EC2_LOCATION}/bin/ec2-authorize", + :args => { + "AUTHORIZEDPORTS" => { + :opt => '-p', + :proc => lambda {|str| str.split(',').join(' -p ')} + } + } + }, + :reboot => { + :cmd => "#{EC2_LOCATION}/bin/ec2-reboot-instances" + }, + :stop => { + :cmd => "#{EC2_LOCATION}/bin/ec2-stop-instances" + }, + :start => { + :cmd => "#{EC2_LOCATION}/bin/ec2-start-instances" + }, + :tags => { + :cmd => "#{EC2_LOCATION}/bin/ec2-create-tags", + :args => { + "TAGS" => { + :opt => '-t', + :proc => lambda {|str| str.split(',').join(' -t ')} + } + } + } } # EC2 constructor, loads defaults for the EC2Driver def initialize(ec2_conf = nil) - if !EC2_JVM_CONCURRENCY concurrency = 5 else concurrency = EC2_JVM_CONCURRENCY.to_i end - super('', - :concurrency => concurrency, - :threaded => true - ) + super('', :concurrency => concurrency, :threaded => true) @defaults = Hash.new @@ -83,18 +169,127 @@ class EC2Driver < VirtualMachineDriver return if !ec2 - @defaults["KEYPAIR"] = ec2_value(ec2,"KEYPAIR") - @defaults["AUTHORIZEDPORTS"] = ec2_value(ec2,"AUTHORIZEDPORTS") - @defaults["INSTANCETYPE"] = ec2_value(ec2,"INSTANCETYPE") + EC2.each {|action, hash| + if hash[:args] + hash[:args].each { |key, value| + @defaults[key] = value_from_xml(ec2, key) + } + end + } end end # DEPLOY action, also sets ports and ip if needed def deploy(id, drv_message) + ec2_info = get_deployment_info(drv_message) + return unless ec2_info + + if !ec2_value(ec2_info, 'AMI') + msg = "Can not find AMI in deployment file" + send_message(ACTION[:deploy], RESULT[:failure], id, msg) + return + end + + deploy_exe = exec_and_log_ec2(:run, ec2_info, id) + if deploy_exe.code != 0 + msg = deploy_exe.stderr + send_message(ACTION[:deploy], RESULT[:failure], id, msg) + return + end + + if !deploy_exe.stdout.match(/^INSTANCE\s*(.+?)\s/) + msg = "Could not find instance id. Check ec2-describe-instances" + send_message(ACTION[:deploy], RESULT[:failure], id, msg) + return + end + + deploy_id = $1 + + if ec2_value(ec2_info, 'AUTHORIZEDPORTS') + exec_and_log_ec2(:authorize, ec2_info, 'default', id) + end + + if ec2_value(ec2_info, 'TAGS') + exec_and_log_ec2(:tags, ec2_info, deploy_id, id) + end + + if ec2_value(ec2_info, 'ELASTICIP') + exec_and_log_ec2(:associate, ec2_info, "-i #{deploy_id}", id) + end + + send_message(ACTION[:deploy], RESULT[:success], id, deploy_id) + end + + # Shutdown a EC2 instance + def shutdown(id, drv_message) + ec2_action(drv_message, :terminate, ACTION[:shutdown], id) + end + + # Reboot a EC2 instance + def reboot(id, drv_message) + ec2_action(drv_message, :reboot, ACTION[:reboot], id) + end + + # Cancel a EC2 instance + def cancel(id, drv_message) + ec2_action(drv_message, :terminate, ACTION[:cancel], id) + end + + # Stop a EC2 instance + def save(id, drv_message) + ec2_action(drv_message, :stop, ACTION[:save], id) + end + + # Cancel a EC2 instance + def restore(id, drv_message) + ec2_action(drv_message, :start, ACTION[:restor], id) + end + + # Get info (IP, and state) for a EC2 instance + def poll(id, drv_message) + msg = decode(drv_message) + + deploy_id = msg.elements["DEPLOY_ID"].text + + info = "#{POLL_ATTRIBUTE[:usedmemory]}=0 " \ + "#{POLL_ATTRIBUTE[:usedcpu]}=0 " \ + "#{POLL_ATTRIBUTE[:nettx]}=0 " \ + "#{POLL_ATTRIBUTE[:netrx]}=0" + + + exe = exec_and_log_ec2(:describe, nil, deploy_id, id) + if exe.code != 0 + send_message(ACTION[:poll], RESULT[:failure], id, exe.stderr) + return + end + + exe.stdout.match(Regexp.new("INSTANCE\\s+#{deploy_id}\\s+(.+)")) + + if !$1 + info << " #{POLL_ATTRIBUTE[:state]}=#{VM_STATE[:deleted]}" + else + monitor_data = $1.split(/\s+/) + + case monitor_data[3] + when "pending" + info << " #{POLL_ATTRIBUTE[:state]}=#{VM_STATE[:active]}" + when "running" + info<<" #{POLL_ATTRIBUTE[:state]}=#{VM_STATE[:active]}"<< + " IP=#{monitor_data[1]}" + when "shutting-down","terminated" + info << " #{POLL_ATTRIBUTE[:state]}=#{VM_STATE[:deleted]}" + end + end + + send_message(ACTION[:poll], RESULT[:success], id, info) + end + +private + + def get_deployment_info(drv_message) msg = decode(drv_message) host = msg.elements["HOST"].text - local_dfile = msg.elements["LOCAL_DEPLOYMENT_FILE"].text if !local_dfile @@ -134,293 +329,66 @@ class EC2Driver < VirtualMachineDriver end end - aki = ec2_value(ec2,"AKI") - ami = ec2_value(ec2,"AMI") - ports = ec2_value(ec2,"AUTHORIZEDPORTS") - blockmapping = ec2_value(ec2,"BLOCKDEVICEMAPPING") - clienttoken = ec2_value(ec2,"CLIENTTOKEN") - ec2region = host_info("deploy",host,"EC2REGION") - eip = ec2_value(ec2,"ELASTICIP") - type = ec2_value(ec2,"INSTANCETYPE") - keypair = ec2_value(ec2,"KEYPAIR") - licensepool = ec2_value(ec2,"LICENSEPOOL") - clustergroup = ec2_value(ec2,"PLACEMENTGROUP") - privateip = ec2_value(ec2,"PRIVATEIP") - ramdisk = ec2_value(ec2,"RAMDISK") - secgroup = ec2_value(ec2,"SECURITYGROUPS") - subnetid = ec2_value(ec2,"SUBNETID") - tags = ec2_value(ec2,"TAGS") - tenancy = ec2_value(ec2,"TENANCY") - vpcid = ec2_value(ec2,"VPCID") - waitb4eip = ec2_value(ec2,"WAITFORINSTANCE") - ec2context = "" - - # get context data, if any - all_context_elements = xml.root.get_elements("CONTEXT") - context_root = all_context_elements[0] - if context_root - context_ud = ec2_value(context_root,"USERDATA") - context_udf = ec2_value(context_root,"USERDATAFILE") - ec2context << " -d '#{context_ud}'" if context_ud - ec2context << " -f #{context_udf}" if context_udf - end - - if !ami - send_message(ACTION[:deploy],RESULT[:failure],id, - "Can not find AMI in deployment file #{local_dfile}") - return - end - - deploy_cmd = "#{EC2[:run]} --region #{ec2region} #{ec2context}" - deploy_cmd << " #{ami}" - deploy_cmd << " --kernel #{aki}" if aki - deploy_cmd << " -b #{blockmapping}" if blockmapping - deploy_cmd << " --client-token #{clienttoken}" if clienttoken - deploy_cmd << " -k #{keypair}" if keypair - deploy_cmd << " --license-pool #{licensepool}" if licensepool - deploy_cmd << " -t #{type}" if type - deploy_cmd << " --ramdisk #{ramdisk}" if ramdisk - deploy_cmd << " --placement-group #{clustergroup}" if clustergroup - if subnetid - deploy_cmd << " -s #{subnetid}" - deploy_cmd << " --private-ip-address #{privateip}" if privateip - deploy_cmd << " --tenancy #{tenancy}" if tenancy - end - if secgroup - for grouptok in secgroup.split(',') - deploy_cmd << " -g #{grouptok}" - end - end - - deploy_exe = LocalCommand.run(deploy_cmd, log_method(id)) - - if deploy_exe.code != 0 - send_message(ACTION[:deploy],RESULT[:failure],id) - return - end - - if !deploy_exe.stdout.match(/^INSTANCE\s*(.+?)\s/) - send_message(ACTION[:deploy],RESULT[:failure],id, - "Could not find instance id. Check ec2-describe-instances") - return - end - - deploy_id = $1 - - if ports - ports_cmd = "#{EC2[:authorize]} --region #{ec2region} default -p #{ports}" - ports_exe = LocalCommand.run(ports_cmd, log_method(id)) - end - - # adding EC2 tags if any are defined in the EC2 section. - # Tags should be defined as a TAG=VAL comma seperated string like - # TAGS="Tag1=Val1, Tag2=Val2, ..." - # - # Notes: - # - If 'Value' starts with a '$', the code will try to resolve it as - # a variable name. For example the special tag 'Name' can be use - # to define the name of the instance visible in the AWS Console - # with the following tag definition - # TAGS="Name=$NAME, tag2=val2, ..." - # - # To resolve the variables, the instance's deployment.0 file - # is parsed as follow: - # -> try to find an element corresponding to the variable name at - # the root of the xml tree - # -> if none is found, another search if tried at second level of - # the tree - # -> In both cases, the first match will be used so you know what - # to look at if you don't get what you expect... might be - # fixed in the future if needed. - if tags - tags_cmd = "#{EC2[:tags]} --region #{ec2region} #{deploy_id}" - for tag in tags.split(',') - token = tag.split('=') - t_regex = /^(.{1})(.*)$/ - t_match = t_regex.match(token[1]) - if t_match[1] == "$" - value = "" - element = xml.root.elements[t_match[2]] - element = xml.root.elements["*/" << t_match[2]] unless element - value = element.text.strip if element && element.text - tag = token[0].strip << "=" << value.chomp - end - tags_cmd << " -t #{tag}" - end - tags_exe = LocalCommand.run(tags_cmd, log_method(id)) - end - - if eip - if subnetid - ip_cmd = "#{EC2[:associate]} --region #{ec2region} -a #{eip} -i #{deploy_id}" - else - ip_cmd = "#{EC2[:associate]} --region #{ec2region} #{eip} -i #{deploy_id}" - end - - # Make sure instance is running state before assigning Elastic IP - if waitb4eip - pos=2 - pos=1 if subnetid - wait4instance(ec2region,id,deploy_id,pos,"running") - end - - ip_exe = LocalCommand.run(ip_cmd, log_method(id)) - if !ip_exe.stdout.match(/^ADDRESS\s*(.+?)\s/) - send_message(ACTION[:deploy],RESULT[:failure],id, - "Could not associate Elastic IP. Check template definition.") - return - end - end - - send_message(ACTION[:deploy],RESULT[:success],id,deploy_id) + ec2 end - # Shutdown a EC2 instance - def shutdown(id, drv_message) + # Execute an EC2 command and send the SUCCESS or FAILURE signal + # +drv_message+: String, base64 encoded info sent by ONE + # +ec2_action+: Symbol, one of the keys of the EC2 hash constant (i.e :run) + # +one_action+: String, OpenNebula action + # +id+: String, action id + def ec2_action(drv_message, ec2_action, one_action, id) msg = decode(drv_message) - host = msg.elements["HOST"].text deploy_id = msg.elements["DEPLOY_ID"].text - ec2_terminate(ACTION[:shutdown], id, deploy_id, host) - end - - # Reboot a EC2 instance - def reboot(id, drv_message) - ec2region = host_info("reboot",host,"EC2REGION") - - cmd = "#{EC2_LOCATION}/bin/ec2-reboot-instances --region #{ec2region} #{deploy_id}" - exe = LocalCommand.run(cmd, log_method(id)) - + exe = exec_and_log_ec2(ec2_action, nil, deploy_id, id) if exe.code != 0 - result = RESULT[:failure] + send_message(one_action, RESULT[:failure], id, exe.stderr) else - result = RESULT[:success] + send_message(one_action, RESULT[:success], id) end - - send_message(action,result,id) end - # Cancel a EC2 instance - def cancel(id, drv_message) - msg = decode(drv_message) + # Execute an EC2 command and log the message if error + # This function will build the command joining the :cmd value of the EC2 + # hash, the extra_params string and the options built from the :args schema + # of the EC2 hash and the xml + # +action+: Symbol, one of the keys of the EC2 hash constant (i.e :run) + # +xml+: REXML Document, containing EC2 information + # +extra_params+: String, extra information to be added to the command + def exec_and_log_ec2(action, xml, extra_params="", id) + cmd = EC2[action][:cmd].clone + cmd << ' ' << extra_params << ' ' if extra_params - host = msg.elements["HOST"].text - deploy_id = msg.elements["DEPLOY_ID"].text - - ec2_terminate(ACTION[:cancel], id, deploy_id, host) - end - - # Get info (IP, and state) for a EC2 instance - def poll(id, drv_message) - msg = decode(drv_message) - - host = msg.elements["HOST"].text - deploy_id = msg.elements["DEPLOY_ID"].text - ec2region = host_info("poll",host,"EC2REGION") - - info = "#{POLL_ATTRIBUTE[:usedmemory]}=0 " \ - "#{POLL_ATTRIBUTE[:usedcpu]}=0 " \ - "#{POLL_ATTRIBUTE[:nettx]}=0 " \ - "#{POLL_ATTRIBUTE[:netrx]}=0" - - cmd = "#{EC2[:describe]} --region #{ec2region} --hide-tags #{deploy_id}" - exe = LocalCommand.run(cmd, log_method(id)) - - if exe.code != 0 - send_message(ACTION[:poll],RESULT[:failure],id) - return + if EC2[action][:args] + cmd << EC2[action][:args].map {|k,v| + str = ec2_value(xml, k, &v[:proc]) + v[:opt] + ' ' + str if str + }.join(' ') end - exe.stdout.match(Regexp.new("INSTANCE\\s+#{deploy_id}\\s+(.+)")) + LocalCommand.run(cmd, log_method(id)) + end - if !$1 - info << " #{POLL_ATTRIBUTE[:state]}=#{VM_STATE[:deleted]}" + # Returns the value of the xml specified by the name or the default + # one if it does not exist + # +xml+: REXML Document, containing EC2 information + # +name+: String, xpath expression to retrieve the value + # +block+: Block, block to be applied to the value before returning it + def ec2_value(xml, name, &block) + value = value_from_xml(xml, name) || @defaults[name] + if block_given? && value + block.call(value) else - monitor_data = $1.split(/\s+/) - - case monitor_data[3] - when "pending" - info << " #{POLL_ATTRIBUTE[:state]}=#{VM_STATE[:active]}" - when "running" - info<<" #{POLL_ATTRIBUTE[:state]}=#{VM_STATE[:active]}"<< - " IP=#{monitor_data[1]}" - when "shutting-down","terminated" - info << " #{POLL_ATTRIBUTE[:state]}=#{VM_STATE[:deleted]}" - end + value end - - send_message(ACTION[:poll], RESULT[:success], id, info) end -private - - def ec2_terminate(action, id, deploy_id, host) - ec2region = host_info("terminate",host,"EC2REGION") - - cmd = "#{EC2_LOCATION}/bin/ec2-terminate-instances --region #{ec2region} #{deploy_id}" - exe = LocalCommand.run(cmd, log_method(id)) - - if exe.code != 0 - result = RESULT[:failure] - else - result = RESULT[:success] - end - - send_message(action,result,id) - end - - def ec2_value(xml,name) - value = nil - element = xml.elements[name] - value = element.text.strip if element && element.text - - if !value - value = @defaults[name] - end - - return value - end - def host_info(action,host,key) - # get EC2REGION parameter from the selected host - query_cmd = "#{ONE_LOCATION}/bin/onehost show #{host}" - query_exe = LocalCommand.run(query_cmd, log_method(id)) - - if query_exe.code != 0 - send_message(action,RESULT[:failure],host) - return - end - - if !query_exe.stdout.match(/^#{key}=(.+?)\s/) - send_message(action,RESULT[:failure],host, - "Could not find #{param} parameter for host #{host}. Use onehost update #{host} to define this paramete first") - return - end - - return $1 - end - def wait4instance(region,id,deploy_id,pos,value) - found = 0 - while found == 0 do - poll_cmd = "#{EC2[:describe]} --region #{region} --hide-tags #{deploy_id}" - poll_exe = LocalCommand.run(poll_cmd, log_method(id)) - - if poll_exe.code != 0 - send_message(ACTION[:deploy],RESULT[:failure],id, - "Error polling instance.") - return - end - - poll_exe.stdout.match(Regexp.new("INSTANCE\\s+#{deploy_id}\\s+(.+)")) - - if !$1 - send_message(ACTION[:deploy],RESULT[:failure],id, - "ERROR: Instance not found and should have been created.") - return - else - monitor_data = $1.split(/\s+/) - found = 1 if monitor_data[pos] == value - end + def value_from_xml(xml, name) + if xml + element = xml.elements[name] + element.text.strip if element && element.text end end end