def process(msg)
if job = jobs[msg.token]
job.process(msg)
if job.intermediate_handler && (job.pending_keys.size > 0)
unless job.pending_keys.size == 1
raise "IntermediateMessages are currently dispatched as they arrive, shouldn't have more than one key in pending_keys: #{job.pending_keys.inspect}"
end
key = job.pending_keys.first
handler = job.intermediate_handler_for_key(key)
if handler
case handler.arity
when 2
handler.call(job.intermediate_state[msg.from][key].last, job)
when 3
handler.call(key, msg.from, job.intermediate_state[msg.from][key].last)
when 4
handler.call(key, msg.from, job.intermediate_state[msg.from][key].last, job)
end
end
job.reset_pending_intermediate_state_keys
end
if job.completed?
jobs.delete(job.token)
if job.completed
case job.completed.arity
when 1
job.completed.call(job.results)
when 2
job.completed.call(job.results, job)
end
end
end
end
end