class StompServer::QueueManager

Public Instance Methods

ack(connection, frame) click to toggle source
# File lib/stomp_server/queue_manager.rb, line 128
def ack(connection, frame)
  puts "Acking #{frame.headers['message-id']}" if $DEBUG
  unless @pending[connection]
    puts "No message pending for connection!"
    return
  end
  msgid = frame.headers['message-id']
  p_msgid = @pending[connection].headers['message-id']
  if p_msgid != msgid
    # We don't know what happened, we requeue
    # (probably a client connecting to a restarted server)
    frame = @pending[connection]
    @qstore.requeue(frame.headers['destination'],frame)
    puts "Invalid message-id (received #{msgid} != #{p_msgid})"
  end
  @pending.delete connection
  # We are free to work now, look if there's something for us
  send_a_backlog(connection)
end
dequeue(dest) click to toggle source

For protocol handlers that want direct access to the queue

# File lib/stomp_server/queue_manager.rb, line 198
def dequeue(dest)
  @qstore.dequeue(dest)
end
disconnect(connection) click to toggle source
# File lib/stomp_server/queue_manager.rb, line 148
def disconnect(connection)
  puts "Disconnecting"
  frame = @pending[connection]
  if frame
    @qstore.requeue(frame.headers['destination'],frame)
    @pending.delete connection
  end

  @queues.each do |dest, queue|
    queue.delete_if { |qu| qu.connection == connection }
    @queues.delete(dest) if queue.empty?
  end
end
enqueue(frame) click to toggle source
# File lib/stomp_server/queue_manager.rb, line 202
def enqueue(frame)
  frame.command = "MESSAGE"
  dest = frame.headers['destination']
  @qstore.enqueue(dest,frame)
end
send_a_backlog(connection) click to toggle source

Send at most one frame to a connection used when use_ack == true

# File lib/stomp_server/queue_manager.rb, line 87
def send_a_backlog(connection)
  puts "Sending a backlog" if $DEBUG
  # lookup queues with data for this connection
  possible_queues = @queues.select{ |destination,users|
    @qstore.message_for?(destination) &&
      users.detect{|u| u.connection == connection}
  }
  if possible_queues.empty?
    puts "Nothing left" if $DEBUG
    return
  end
  # Get a random one (avoid artificial priority between queues
  # without coding a whole scheduler, which might be desirable later)
  dest,users = possible_queues[rand(possible_queues.length)]
  user = users.find{|u| u.connection == connection}
  frame = @qstore.dequeue(dest)
  puts "Chose #{dest}" if $DEBUG
  send_to_user(frame, user)
end
send_destination_backlog(dest,user) click to toggle source
# File lib/stomp_server/queue_manager.rb, line 107
def send_destination_backlog(dest,user)
  puts "Sending destination backlog for #{dest}" if $DEBUG
  if user.ack
    # only send one message (waiting for ack)
    frame = @qstore.dequeue(dest)
    send_to_user(frame, user) if frame
  else
    while frame = @qstore.dequeue(dest)
      send_to_user(frame, user)
    end
  end
end
send_to_user(frame, user) click to toggle source
# File lib/stomp_server/queue_manager.rb, line 162
def send_to_user(frame, user)
  connection = user.connection
  if user.ack
    raise "other connection's end already busy" if @pending[connection]
    @pending[connection] = frame
  end
  connection.stomp_send_data(frame)
end
sendmsg(frame) click to toggle source
# File lib/stomp_server/queue_manager.rb, line 171
def sendmsg(frame)
  frame.command = "MESSAGE"
  dest = frame.headers['destination']
  puts "Sending a message to #{dest}: "
  # Lookup a user willing to handle this destination
  available_users = @queues[dest].reject{|user| @pending[user.connection]}
  if available_users.empty?
    @qstore.enqueue(dest,frame)
    return
  end

  # Look for a user with ack (we favor reliability)
  reliable_user = available_users.find{|u| u.ack}

  if reliable_user
    # give it a message-id
    @qstore.assign_id(frame, dest)
    send_to_user(frame, reliable_user)
  else
    random_user = available_users[rand(available_users.length)]
    # Note message-id header isn't set but we won't need it anyway
    # <TODO> could break some clients: fix this
    send_to_user(frame, random_user)
  end
end
stop() click to toggle source
# File lib/stomp_server/queue_manager.rb, line 74
def stop
  @qstore.stop if @qstore.methods.include?('stop')
end
subscribe(dest, connection, use_ack=false) click to toggle source
# File lib/stomp_server/queue_manager.rb, line 78
def subscribe(dest, connection, use_ack=false)
  puts "Subscribing to #{dest}"
  user = Struct::QueueUser.new(connection, use_ack)
  @queues[dest] += [user]
  send_destination_backlog(dest,user) unless dest == '/queue/monitor'
end
unsubscribe(dest, connection) click to toggle source
# File lib/stomp_server/queue_manager.rb, line 120
def unsubscribe(dest, connection)
  puts "Unsubscribing from #{dest}"
  @queues.each do |d, queue|
    queue.delete_if { |qu| qu.connection == connection and d == dest}
  end
  @queues.delete(dest) if @queues[dest].empty?
end

Public Class Methods

new(qstore) click to toggle source
# File lib/stomp_server/queue_manager.rb, line 63
def initialize(qstore)
  @qstore = qstore
  @queues = Hash.new { Array.new }
  @pending = Hash.new
  if $STOMP_SERVER
    monitor = StompServer::QueueMonitor.new(@qstore,@queues)
    monitor.start
    puts "Queue monitor started" if $DEBUG
  end
end