class StompServer::ActiveRecordQueue

Attributes

checkpoint_interval[RW]

Public Instance Methods

affect_msgid_and_store(frame, queue_name) click to toggle source

store a frame (assigning it a message-id)

# File lib/stomp_server/queue/activerecord_queue.rb, line 73
def affect_msgid_and_store(frame, queue_name)
  msgid = assign_id(frame, queue_name)
  ArMessage.create!(:stomp_id => msgid, :frame => frame)
end
assign_id(frame, queue_name) click to toggle source
# File lib/stomp_server/queue/activerecord_queue.rb, line 82
def assign_id(frame, queue_name)
  msgid = @stompid[@frames[queue_name][:last_index] += 1]
  frame.headers['message-id'] = msgid
end
dequeue(queue_name) click to toggle source

Get and remove a frame from the queue

# File lib/stomp_server/queue/activerecord_queue.rb, line 53
def dequeue(queue_name)
  return nil unless @frames[queue_name] && !@frames[queue_name][:frames].empty?
  frame = @frames[queue_name][:frames].shift
  remove_from_store(frame.headers['message-id'])
  return frame
end
enqueue(queue_name, frame) click to toggle source

Add a frame to the queue

# File lib/stomp_server/queue/activerecord_queue.rb, line 41
def enqueue(queue_name, frame)
  unless @frames[queue_name]
    @frames[queue_name] = {
      :last_index => 0,
      :frames => [],
    }
  end
  affect_msgid_and_store(frame, queue_name)
  @frames[queue_name][:frames] << frame
end
message_for?(queue_name) click to toggle source
# File lib/stomp_server/queue/activerecord_queue.rb, line 78
def message_for?(queue_name)
  @frames[queue_name] && !@frames[queue_name][:frames].empty?
end
remove_from_store(message_id) click to toggle source

remove a frame from the store

# File lib/stomp_server/queue/activerecord_queue.rb, line 68
def remove_from_store(message_id)
  ArMessage.find_by_stomp_id(message_id).destroy
end
requeue(queue_name, frame) click to toggle source

Requeue the frame previously pending

# File lib/stomp_server/queue/activerecord_queue.rb, line 61
def requeue(queue_name, frame)
  @frames[queue_name][:frames] << frame
  ArMessage.create!(:stomp_id => frame.headers['message-id'],
                    :frame => frame)
end

Public Class Methods

new(configdir, storagedir) click to toggle source
# File lib/stomp_server/queue/activerecord_queue.rb, line 14
def initialize(configdir, storagedir)
  # Default configuration, use SQLite for simplicity
  db_params = {
    'adapter' => 'sqlite3',
    'database' => "#{configdir}/stompserver_development"
  }
  # Load DB configuration
  db_config = "#{configdir}/database.yml"
  puts "reading from #{db_config}"
  if File.exists? db_config
    db_params.merge! YAML::load(File.open(db_config))
  end

  puts "using #{db_params['database']} DB"

  # Setup activerecord
  ActiveRecord::Base.establish_connection(db_params)
  # Development <TODO> fix this
  ActiveRecord::Base.logger = Logger.new(STDERR)
  ActiveRecord::Base.logger.level = Logger::INFO
  # we need the connection, it can't be done earlier
  ArMessage.reset_column_information
  reload_queues
  @stompid = StompServer::StompId.new
end