def dispatch(deliverable)
prefix, meth = deliverable.type.split('/')[1..-1]
meth ||= :index
actor = registry.actor_for(prefix)
operation = lambda do
begin
intermediate_results_proc = lambda { |*args| self.handle_intermediate_results(actor, meth, deliverable, *args) }
args = [ deliverable.payload ]
args.push(deliverable) if actor.method(meth).arity == 2
actor.send(meth, *args, &intermediate_results_proc)
rescue Exception => e
handle_exception(actor, meth, deliverable, e)
end
end
callback = lambda do |r|
if deliverable.kind_of?(Request)
r = Result.new(deliverable.token, deliverable.reply_to, r, identity)
Nanite::Log.debug("SEND #{r.to_s([])}")
amq.queue(deliverable.reply_to, :no_declare => options[:secure]).publish(serializer.dump(r))
end
r
end
if @options[:single_threaded] || @options[:thread_poolsize] == 1
@evmclass.next_tick { callback.call(operation.call) }
else
@evmclass.defer(operation, callback)
end
end