store a frame (assigning it a message-id)
# File lib/stomp_server/queue/activerecord_queue.rb, line 73 def affect_msgid_and_store(frame, queue_name) msgid = assign_id(frame, queue_name) ArMessage.create!(:stomp_id => msgid, :frame => frame) end
# File lib/stomp_server/queue/activerecord_queue.rb, line 82 def assign_id(frame, queue_name) msgid = @stompid[@frames[queue_name][:last_index] += 1] frame.headers['message-id'] = msgid end
Get and remove a frame from the queue
# File lib/stomp_server/queue/activerecord_queue.rb, line 53 def dequeue(queue_name) return nil unless @frames[queue_name] && !@frames[queue_name][:frames].empty? frame = @frames[queue_name][:frames].shift remove_from_store(frame.headers['message-id']) return frame end
Add a frame to the queue
# File lib/stomp_server/queue/activerecord_queue.rb, line 41 def enqueue(queue_name, frame) unless @frames[queue_name] @frames[queue_name] = { :last_index => 0, :frames => [], } end affect_msgid_and_store(frame, queue_name) @frames[queue_name][:frames] << frame end
# File lib/stomp_server/queue/activerecord_queue.rb, line 78 def message_for?(queue_name) @frames[queue_name] && !@frames[queue_name][:frames].empty? end
remove a frame from the store
# File lib/stomp_server/queue/activerecord_queue.rb, line 68 def remove_from_store(message_id) ArMessage.find_by_stomp_id(message_id).destroy end
Requeue the frame previously pending
# File lib/stomp_server/queue/activerecord_queue.rb, line 61 def requeue(queue_name, frame) @frames[queue_name][:frames] << frame ArMessage.create!(:stomp_id => frame.headers['message-id'], :frame => frame) end
# File lib/stomp_server/queue/activerecord_queue.rb, line 14 def initialize(configdir, storagedir) # Default configuration, use SQLite for simplicity db_params = { 'adapter' => 'sqlite3', 'database' => "#{configdir}/stompserver_development" } # Load DB configuration db_config = "#{configdir}/database.yml" puts "reading from #{db_config}" if File.exists? db_config db_params.merge! YAML::load(File.open(db_config)) end puts "using #{db_params['database']} DB" # Setup activerecord ActiveRecord::Base.establish_connection(db_params) # Development <TODO> fix this ActiveRecord::Base.logger = Logger.new(STDERR) ActiveRecord::Base.logger.level = Logger::INFO # we need the connection, it can't be done earlier ArMessage.reset_column_information reload_queues @stompid = StompServer::StompId.new end