diff --git a/src/mad/ruby/ActionManager.rb b/src/mad/ruby/ActionManager.rb new file mode 100644 index 0000000000..81a3df62ed --- /dev/null +++ b/src/mad/ruby/ActionManager.rb @@ -0,0 +1,211 @@ +# -------------------------------------------------------------------------- */ +# Copyright 2002-2009, Distributed Systems Architecture Group, Universidad */ +# Complutense de Madrid (dsa-research.org) */ +# Licensed under the Apache License, Version 2.0 (the "License"); you may */ +# not use this file except in compliance with the License. You may obtain */ +# a copy of the License at */ +# */ +# http://www.apache.org/licenses/LICENSE-2.0 */ +# */ +# Unless required by applicable law or agreed to in writing, software */ +# distributed under the License is distributed on an "AS IS" BASIS, */ +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. */ +# See the License for the specific language governing permissions and */ +# limitations under the License. */ +# -------------------------------------------------------------------------- */ + +require 'thread' + +=begin rdoc +Copyright 2002-2009, Distributed Systems Architecture Group, Universidad +Complutense de Madrid (dsa-research.org) + +This class provides support to handle actions. Class methods, or actions, can be +registered in the action manager. The manager will wait for actions to be +triggered (thread-safe), and will execute them concurrently. The action manager +can be used to synchronize different objects in different threads + +== Example + +class Sample + attr_reader :am + + def initialize + @am = ActionManager.new(15,true) + + @am.register_action("SLEEP",method("sleep_action")) + @am.register_action("FINALIZE",method("finalize_action"),false) + + @am.start_listener + end + + def sleep_action(secs) + sleep(secs) + end + + def finalize_action + p "Exiting..." + @am.stop_listener + end +end + + s = Sample.new + + s.am.trigger_action("SLEEP",rand(3)+1) + s.am.trigger_action("FINALIZE") +=end + +class ActionManager + + # Creates a new Action Manager + # + # +concurrency+ is the maximun number of actions that can run at the same + # time + # +threaded+ if true actions will be executed by default in a different + # thread + def initialize(concurrency=10, threaded=true) + @actions = Hash.new + @threaded = threaded + + @concurrency = concurrency + @action_queue = Array.new + @running_actions = 0 + + @listener_thread = nil + @threads_mutex = Mutex.new + @threads_cond = ConditionVariable.new + end + + # Registers a new action in the manager. An action is defined by: + # + # +aname+ name of the action, it will identify the action + # +method+ it's invoked with call. It should be a Proc or Method object + # +threaded+ execute the action in a new thread + def register_action(aname, method, threaded=nil) + threaded ||= @threaded + + @actions[aname]={ + :method => method, + :threaded => threaded + } + end + + # Triggers the execution of the action. + # + # +aname+ name of the action + # +aargs+ arguments to call the action + def trigger_action(aname,*aargs) + @threads_mutex.synchronize { + + if !@actions.has_key?(aname) + return + end + + if @actions[aname][:method].arity != aargs.length + return + end + + @action_queue << @actions[aname].merge(:args => aargs) + + if @running_actions < @concurrency + @threads_cond.signal + end + } + end + + def stop_listener + @listener_thread.kill! + end + + def start_listener + @listener_thread = Thread.new { + while true + @threads_mutex.synchronize { + while ((@concurrency - @running_actions)==0) || + @action_queue.size==0 + @threads_cond.wait(@threads_mutex) + end + + run_action + } + end + } + end + +private + + def run_action + action = @action_queue.shift + + if action + @running_actions += 1 + + if action[:threaded] + Thread.new { + action[:method].call(*action[:args]) + + @threads_mutex.synchronize { + @running_actions -= 1 + @threads_cond.signal + } + } + else + action[:method].call(*action[:args]) + + @running_actions -= 1 + @threads_cond.signal + end + end + end + +end + +if __FILE__ == $0 + + class Sample + attr_reader :am + + def initialize + @am = ActionManager.new(15,true) + + @am.register_action("SLEEP",method("sleep_action")) +# @am.register_action("SLEEP",Proc.new{|s,i| p s ; sleep(s)}) + @am.register_action("NOP",method("nop_action")) + @am.register_action("FINALIZE",method("finalize_action"),false) + + @am.start_listener + end + + def sleep_action(secs, id) + p "ID: #{id} sleeping #{secs} seconds" + sleep(secs) + p "ID: #{id} Awaken!" + end + + def nop_action + p " - Just an action" + end + + def finalize_action + p "Exiting..." + @am.stop_listener + p "Done!" + end + end + + s = Sample.new + + 100.times {|n| + 100.times {|m| + s.am.trigger_action("SLEEP",rand(3)+1,m+n) + s.am.trigger_action("NOP") + } + } + + sleep 10 + + s.am.trigger_action("FINALIZE") + + sleep 3600 +end + diff --git a/src/mad/ruby/OpenNebulaDriver.rb b/src/mad/ruby/OpenNebulaDriver.rb new file mode 100644 index 0000000000..6e8546551d --- /dev/null +++ b/src/mad/ruby/OpenNebulaDriver.rb @@ -0,0 +1,111 @@ +# -------------------------------------------------------------------------- */ +# Copyright 2002-2009, Distributed Systems Architecture Group, Universidad */ +# Complutense de Madrid (dsa-research.org) */ +# Licensed under the Apache License, Version 2.0 (the "License"); you may */ +# not use this file except in compliance with the License. You may obtain */ +# a copy of the License at */ +# */ +# http://www.apache.org/licenses/LICENSE-2.0 */ +# */ +# Unless required by applicable law or agreed to in writing, software */ +# distributed under the License is distributed on an "AS IS" BASIS, */ +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. */ +# See the License for the specific language governing permissions and */ +# limitations under the License. */ +# -------------------------------------------------------------------------- */ +require "ActionManager" + +# Author:: dsa-research.org +# Copyright:: (c) 2009 Universidad Computense de Madrid +# License:: Apache License + +# This class provides basic messaging and logging functionality +# to implement OpenNebula Drivers. A driver is a program that +# specialize the OpenNebula behavior by interfacing with specific +# infrastructure functionalities. +# +# A Driver inherits this class and only has to provide methods +# for each action it wants to receive. The method must be associated +# with the action name through the register_action func + +class OpenNebulaDriver + def initialize(concurrency=10, threaded=true) + @send_mutex=Mutex.new + + @am = ActionManager.new(concurrency,threaded) + @am.register_action("FINALIZE",method("stop_driver"),false) + end + + def send_message(*args) + @send_mutex.synchronize { + STDOUT.puts args.join(' ') + STDOUT.flush + } + end + + # Sends a log message to ONE. The +message+ can be multiline, it will + # be automatically splitted by lines. + def log(number, message) + msg=message.strip + msg.each_line {|line| + send_message("LOG", "-", number, line.strip) + } + end + + def start_driver + @am.start_listener + loop + end + +private + + def stop_driver + @am.stop_listener + exit(0) + end + + def loop + while true + exit(-1) if STDIN.eof? + + str=STDIN.gets + next if !str + + args = str.split(/\s+/) + next if args.length == 0 + + action = args.shift.upcase + + @am.trigger_action(action,*args) + end + end +end + +if __FILE__ == $0 + + class SampleDriver < OpenNebulaDriver + def initialize + super(5,true) + + @am.register_action("SLEEP",method("my_sleep")) + end + + def response(action,result,info) + send_message(action,result,info) + end + + def my_sleep(timeout, num) + log(num,"Sleeping #{timeout} seconds") + sleep(timeout.to_i) + log(num,"Done with #{num}") + + response("SLEEP","SUCCESS",num.to_s) + end + end + + sd = SampleDriver.new + sd.start_driver + + gets + +end \ No newline at end of file