class AMQP::Channel

h2. What are AMQP channels

To quote {files.travis-ci.org/docs/amqp/0.9.1/AMQP091Specification.pdf AMQP 0.9.1 specification}:

AMQP is a multi-channelled protocol. Channels provide a way to multiplex a heavyweight TCP/IP connection into several light weight connections. This makes the protocol more “firewall friendly” since port usage is predictable. It also means that traffic shaping and other network QoS features can be easily employed. Channels are independent of each other and can perform different functions simultaneously with other channels, the available bandwidth being shared between the concurrent activities.

h2. Opening a channel

*Channels are opened asynchronously*. There are two ways to do it: using a callback or pseudo-synchronous mode.

@example Opening a channel with a callback

# this assumes EventMachine reactor is running
AMQP.connect("amqp://guest:guest@dev.rabbitmq.com:5672") do |client|
  AMQP::Channel.new(client) do |channel, open_ok|
    # when this block is executed, channel is open and ready for use
  end
end

<script src=“gist.github.com/939480.js?file=gistfile1.rb”>>

Unless your application needs multiple channels, this approach is recommended. Alternatively, AMQP::Channel can be instantiated without a block. Then returned channel is not immediately open, however, it can be used as if it was a synchronous, blocking method:

@example Instantiating a channel that will be open eventually

# this assumes EventMachine reactor is running
AMQP.connect("amqp://guest:guest@dev.rabbitmq.com:5672") do |client|
  channel  = AMQP::Channel.new(client)
  exchange = channel.default_exchange

  # ...
end

<script src=“gist.github.com/939482.js?file=gistfile1.rb”>>

Even though in the example above channel isn’t immediately open, it is safe to declare exchanges using it. Exchange declaration will be delayed until after channel is open. Same applies to queue declaration and other operations on exchanges and queues. Library methods that rely on channel being open will be enqueued and executed in a FIFO manner when broker confirms channel opening. Note, however, that *this “pseudo-synchronous mode” is easy to abuse and introduce race conditions AMQP gem cannot resolve for you*. AMQP is an inherently asynchronous protocol and AMQP gem embraces this fact.

h2. Key methods

Key methods of Channel class are

refer to documentation for those methods for usage examples.

Channel provides a number of convenience methods that instantiate queues and exchanges of various types associated with this channel:

h2. Error handling

It is possible (and, indeed, recommended) to handle channel-level exceptions by defining an errback using on_error:

@example Queue declaration with incompatible attributes results in a channel-level exception

AMQP.start("amqp://guest:guest@dev.rabbitmq.com:5672") do |connection, open_ok|
  AMQP::Channel.new do |channel, open_ok|
    puts "Channel ##{channel.id} is now open!"

    channel.on_error do |ch, close|
      puts "Handling channel-level exception"

      connection.close {
        EM.stop { exit }
      }
    end

    EventMachine.add_timer(0.4) do
      # these two definitions result in a race condition. For sake of this example,
      # however, it does not matter. Whatever definition succeeds first, 2nd one will
      # cause a channel-level exception (because attributes are not identical)
      AMQP::Queue.new(channel, "amqpgem.examples.channel_exception", :auto_delete => true, :durable => false) do |queue|
        puts "#{queue.name} is ready to go"
      end

      AMQP::Queue.new(channel, "amqpgem.examples.channel_exception", :auto_delete => true, :durable => true) do |queue|
        puts "#{queue.name} is ready to go"
      end
    end
  end
end

<script src=“gist.github.com/939490.js?file=gistfile1.rb”>>

When channel-level exception is indicated by the broker and errback defined using on_error is run, channel is already closed and all queue and exchange objects associated with this channel are reset. The recommended way to recover from channel-level exceptions is to open a new channel and re-instantiate queues, exchanges and bindings your application needs.

h2. Closing a channel

Channels are opened when objects is instantiated and closed using {close} method when application no longer needs it.

@example Closing a channel your application no longer needs

# this assumes EventMachine reactor is running
AMQP.connect("amqp://guest:guest@dev.rabbitmq.com:5672") do |client|
  AMQP::Channel.new(client) do |channel, open_ok|
    channel.close do |close_ok|
      # when this block is executed, channel is successfully closed
    end
  end
end

<script src=“gist.github.com/939483.js?file=gistfile1.rb”>>

h2. RabbitMQ extensions.

AMQP gem supports several RabbitMQ extensions that extend Channel functionality. Learn more in {file:docs/VendorSpecificExtensions.textile}

@see files.travis-ci.org/docs/amqp/0.9.1/AMQP091Specification.pdf AMQP 0.9.1 specification (Section 2.2.5)

Constants

DEFAULT_REPLY_TEXT
RECOVERY_EVENTS

Attributes

auto_recovery[RW]

@return [Boolean] true if this channel is in automatic recovery mode @see auto_recovering?

conn[R]

AMQP connection this channel is part of @return [Connection]

connection[R]

AMQP connection this channel is part of @return [Connection]

consumers_awaiting_cancel_ok[R]
consumers_awaiting_consume_ok[R]
exchanges_awaiting_bind_ok[R]
exchanges_awaiting_declare_ok[R]
exchanges_awaiting_delete_ok[R]
exchanges_awaiting_unbind_ok[R]
flow_is_active[RW]
id[R]
publisher_index[W]

Change publisher index. Publisher index is incremented by 1 after each Basic.Publish starting at 1. This is done on both client and server, hence this acknowledged messages can be matched via its delivery-tag.

@api private

queues_awaiting_bind_ok[R]
queues_awaiting_declare_ok[R]
queues_awaiting_delete_ok[R]
queues_awaiting_get_response[R]
queues_awaiting_purge_ok[R]
queues_awaiting_unbind_ok[R]
status[R]

Status of this channel (one of: :opening, :closing, :open, :closed) @return [Symbol]

Public Instance Methods

acknowledge(delivery_tag, multiple = false) click to toggle source

Acknowledge one or all messages on the channel.

@api public @see reject @see recover @see files.travis-ci.org/docs/amqp/0.9.1/AMQP091Reference.pdf AMQP 0.9.1 protocol documentation (Section 1.8.3.13.)

# File lib/amqp/channel.rb, line 1020
def acknowledge(delivery_tag, multiple = false)
  @connection.send_frame(AMQ::Protocol::Basic::Ack.encode(self.id, delivery_tag, multiple))

  self
end
after_connection_interruption(&block) click to toggle source
after_recovery(&block) click to toggle source
Alias for: on_recovery
auto_recover() click to toggle source

Called by associated connection object when AMQP connection has been re-established (for example, after a network failure).

@api plugin

# File lib/amqp/channel.rb, line 315
def auto_recover
  return unless auto_recovering?

  @channel_is_open_deferrable.fail
  @channel_is_open_deferrable = AMQP::Deferrable.new

  self.open do
    @channel_is_open_deferrable.succeed

    # re-establish prefetch
    self.prefetch(@options[:prefetch], false) if @options[:prefetch]

    # exchanges must be recovered first because queue recovery includes recovery of bindings. MK.
    @exchanges.each { |name, e| e.auto_recover }
    @queues.each    { |name, q| q.auto_recover }
  end
end
auto_recovering?() click to toggle source

@return [Boolean] true if this channel uses automatic recovery mode

# File lib/amqp/channel.rb, line 307
def auto_recovering?
  @auto_recovery
end
before_recovery(&block) click to toggle source

Defines a callback that will be executed after TCP connection has recovered after a network failure but before AMQP connection is re-opened. Only one callback can be defined (the one defined last replaces previously added ones).

@api public

# File lib/amqp/channel.rb, line 1242
def before_recovery(&block)
  self.redefine_callback(:before_recovery, &block)
end
close(reply_code = 200, reply_text = DEFAULT_REPLY_TEXT, class_id = 0, method_id = 0, &block) click to toggle source

Closes AMQP channel.

@api public

# File lib/amqp/channel.rb, line 949
def close(reply_code = 200, reply_text = DEFAULT_REPLY_TEXT, class_id = 0, method_id = 0, &block)
  self.once_open do
    self.status = :closing
    @connection.send_frame(AMQ::Protocol::Channel::Close.encode(@id, reply_code, reply_text, class_id, method_id))

    self.redefine_callback :close, &block
  end
end
closing?() click to toggle source

@return [Boolean] @api public

# File lib/amqp/channel.rb, line 942
def closing?
  self.status == :closing
end
confirm_select(nowait = false, &block) click to toggle source

@group Publisher Confirms

# File lib/amqp/channel.rb, line 1118
def confirm_select(nowait = false, &block)
  self.once_open do
          if nowait && block
    raise ArgumentError, "confirm.select with nowait = true and a callback makes no sense"
  end

  @uses_publisher_confirmations = true
  reset_publisher_index!

  self.redefine_callback(:confirm_select, &block) unless nowait
  self.redefine_callback(:after_publish) do
    increment_publisher_index!
  end
  @connection.send_frame(AMQ::Protocol::Confirm::Select.encode(@id, nowait))

  self
  end
end
consumers() click to toggle source

@return [Hash<String, Consumer>]

# File lib/amqp/channel.rb, line 1182
def consumers
  @consumers
end
default_exchange() click to toggle source

Returns exchange object with the same name as default (aka unnamed) exchange. Default exchange is a direct exchange and automatically routes messages to queues when routing key matches queue name exactly. This feature is known as “automatic binding” (of queues to default exchange).

*Use default exchange when you want to route messages directly to specific queues* (queue names are known, you don’t mind this kind of coupling between applications).

@example Using default exchange to publish messages to queues with known names

AMQP.start(:host => 'localhost') do |connection|
  ch        = AMQP::Channel.new(connection)

  queue1    = ch.queue("queue1").subscribe do |payload|
    puts "[#{queue1.name}] => #{payload}"
  end
  queue2    = ch.queue("queue2").subscribe do |payload|
    puts "[#{queue2.name}] => #{payload}"
  end
  queue3    = ch.queue("queue3").subscribe do |payload|
    puts "[#{queue3.name}] => #{payload}"
  end
  queues    = [queue1, queue2, queue3]

  # Rely on default direct exchange binding, see section 2.1.2.4 Automatic Mode in AMQP 0.9.1 spec.
  exchange = AMQP::Exchange.default
  EM.add_periodic_timer(1) do
    q = queues.sample

    exchange.publish "Some payload from #{Time.now.to_i}", :routing_key => q.name
  end
end

@see Exchange @see files.travis-ci.org/docs/amqp/0.9.1/AMQP091Specification.pdf AMQP 0.9.1 specification (Section 2.1.2.4)

@return [Exchange] @api public

# File lib/amqp/channel.rb, line 490
def default_exchange
  @default_exchange ||= Exchange.default(self)
end
direct(name = 'amq.direct', opts = {}, &block) click to toggle source

Defines, intializes and returns a direct Exchange instance.

Learn more about direct exchanges in {Exchange Exchange class documentation}.

@param [String] name (amq.direct) Exchange name.

@option opts [Boolean] :passive (false) If set, the server will not create the exchange if it does not

already exist. The client can use this to check whether an exchange
exists without modifying the server state.

@option opts [Boolean] :durable (false) If set when creating a new exchange, the exchange will be marked as

durable. Durable exchanges and their bindings are recreated upon a server
restart (information about them is persisted). Non-durable (transient) exchanges
do not survive if/when a server restarts (information about them is stored exclusively
in RAM).

@option opts [Boolean] :auto_delete (false) If set, the exchange is deleted when all queues have finished

using it. The server waits for a short period of time before
determining the exchange is unused to give time to the client code
to bind a queue to it.

@option opts [Boolean] :internal (default false) If set, the exchange may not be used directly by publishers, but

only when bound to other exchanges. Internal exchanges are used to
construct wiring that is not visible to applications. This is a RabbitMQ-specific
extension.

@option opts [Boolean] :nowait (true) If set, the server will not respond to the method. The client should

not wait for a reply method.  If the server could not complete the
method it will raise a channel or connection exception.

@example Using default pre-declared direct exchange and no callbacks (pseudo-synchronous style)

# an exchange application A will be using to publish updates
# to some search index
exchange = channel.direct("index.updates")

# In the same (or different) process declare a queue that broker will
# generate name for, bind it to aforementioned exchange using method chaining
queue    = channel.queue("").
                   # queue will be receiving messages that were published with
                   # :routing_key attribute value of "search.index.updates"
                   bind(exchange, :routing_key => "search.index.updates").
                   # register a callback that will be run when messages arrive
                   subscribe { |header, message| puts("Received #{message}") }

# now publish a new document contents for indexing,
# message will be delivered to the queue we declared and bound on the line above
exchange.publish(document.content, :routing_key => "search.index.updates")

@example Instantiating a direct exchange using {#direct} with a callback

AMQP.connect do |connection|
  AMQP::Channel.new(connection) do |channel|
    channel.direct("email.replies_listener") do |exchange, declare_ok|
      # by now exchange is ready and waiting
    end
  end
end

@see #default_exchange @see Exchange @see Exchange#initialize @see files.travis-ci.org/docs/amqp/0.9.1/AMQP091Specification.pdf AMQP 0.9.1 specification (Section 3.1.3.1)

@return [Exchange] @api public

# File lib/amqp/channel.rb, line 437
def direct(name = 'amq.direct', opts = {}, &block)
  if exchange = find_exchange(name)
    extended_opts = Exchange.add_default_options(:direct, name, opts, block)

    validate_parameters_match!(exchange, extended_opts, :exchange)

    block.call(exchange) if block
    exchange
  else
    register_exchange(Exchange.new(self, :direct, name, opts, &block))
  end
end
exchanges() click to toggle source

@return [Hash<Exchange>] Collection of exchanges that were declared on this channel.

# File lib/amqp/channel.rb, line 1188
def exchanges
  @exchanges
end
fanout(name = 'amq.fanout', opts = {}, &block) click to toggle source

Defines, intializes and returns a fanout Exchange instance.

Learn more about fanout exchanges in {Exchange Exchange class documentation}.

@param [String] name (amq.fanout) Exchange name.

@option opts [Boolean] :passive (false) If set, the server will not create the exchange if it does not

already exist. The client can use this to check whether an exchange
exists without modifying the server state.

@option opts [Boolean] :durable (false) If set when creating a new exchange, the exchange will be marked as

durable. Durable exchanges and their bindings are recreated upon a server
restart (information about them is persisted). Non-durable (transient) exchanges
do not survive if/when a server restarts (information about them is stored exclusively
in RAM).

@option opts [Boolean] :auto_delete (false) If set, the exchange is deleted when all queues have finished

using it. The server waits for a short period of time before
determining the exchange is unused to give time to the client code
to bind a queue to it.

@option opts [Boolean] :internal (default false) If set, the exchange may not be used directly by publishers, but

only when bound to other exchanges. Internal exchanges are used to
construct wiring that is not visible to applications. This is a RabbitMQ-specific
extension.

@option opts [Boolean] :nowait (true) If set, the server will not respond to the method. The client should

not wait for a reply method.  If the server could not complete the
method it will raise a channel or connection exception.

@example Using fanout exchange to deliver messages to multiple consumers

# open up a channel
# declare a fanout exchange
# declare 3 queues, binds them
# publish a message

@see Exchange @see Exchange#initialize @see #default_exchange @see files.travis-ci.org/docs/amqp/0.9.1/AMQP091Specification.pdf AMQP 0.9.1 specification (Section 3.1.3.2)

@return [Exchange] @api public

# File lib/amqp/channel.rb, line 540
def fanout(name = 'amq.fanout', opts = {}, &block)
  if exchange = find_exchange(name)
    extended_opts = Exchange.add_default_options(:fanout, name, opts, block)

    validate_parameters_match!(exchange, extended_opts, :exchange)

    block.call(exchange) if block
    exchange
  else
    register_exchange(Exchange.new(self, :fanout, name, opts, &block))
  end
end
find_exchange(name) click to toggle source

Finds exchange in the exchanges cache on this channel by name. Exchange only exists in the cache if it was previously instantiated on this channel.

@param [String] name Exchange name @return [AMQP::Exchange] Exchange (if found) @api plugin

# File lib/amqp/channel.rb, line 1393
def find_exchange(name)
  @exchanges[name]
end
find_queue(name) click to toggle source

@api plugin @private

# File lib/amqp/channel.rb, line 1407
def find_queue(name)
  @queues[name]
end
flow(active = false, &block) click to toggle source

Asks the peer to pause or restart the flow of content data sent to a consumer. This is a simple flow­control mechanism that a peer can use to avoid overflowing its queues or otherwise finding itself receiving more messages than it can process. Note that this method is not intended for window control. It does not affect contents returned to AMQP::Queue#get callers.

@param [Boolean] Desired flow state.

@see files.travis-ci.org/docs/amqp/0.9.1/AMQP091Reference.pdf AMQP 0.9.1 protocol documentation (Section 1.5.2.3.) @api public

# File lib/amqp/channel.rb, line 975
def flow(active = false, &block)
  @connection.send_frame(AMQ::Protocol::Channel::Flow.encode(@id, active))

  self.redefine_callback :flow, &block
  self
end
flow_is_active?() click to toggle source

@return [Boolean] True if flow in this channel is active (messages will be delivered to consumers that use this channel).

@api public

# File lib/amqp/channel.rb, line 985
def flow_is_active?
  @flow_is_active
end
handle_basic_ack(method) click to toggle source

Handler for Basic.Ack. By default, it just executes hook specified via the confirm method with a single argument, a protocol method class instance (an instance of AMQ::Protocol::Basic::Ack).

@api plugin

# File lib/amqp/channel.rb, line 1360
def handle_basic_ack(method)
  self.exec_callback(:ack, method)
end
handle_basic_nack(method) click to toggle source

Handler for Basic.Nack. By default, it just executes hook specified via the confirm_failed method with a single argument, a protocol method class instance (an instance of AMQ::Protocol::Basic::Nack).

@api plugin

# File lib/amqp/channel.rb, line 1371
def handle_basic_nack(method)
  self.exec_callback(:nack, method)
end
handle_close(channel_close) click to toggle source

@api plugin @private

# File lib/amqp/channel.rb, line 1460
def handle_close(channel_close)
  self.status = :closed
  self.connection.clear_frames_on(self.id)

  self.exec_callback_yielding_self(:error, channel_close)
end
handle_close_ok(close_ok) click to toggle source

@api plugin @private

# File lib/amqp/channel.rb, line 1450
def handle_close_ok(close_ok)
  self.status = :closed
  self.connection.clear_frames_on(self.id)
  self.exec_callback_once_yielding_self(:close, close_ok)

  @connection.release_channel_id(@id)
end
handle_connection_interruption(method = nil) click to toggle source

Overrides superclass method to also re-create @channel_is_open_deferrable

@api plugin @private

# File lib/amqp/channel.rb, line 1170
def handle_connection_interruption(method = nil)
  @queues.each    { |name, q| q.handle_connection_interruption(method) }
  @exchanges.each { |name, e| e.handle_connection_interruption(method) }

  self.exec_callback_yielding_self(:after_connection_interruption)
  self.reset_state!

  @connection.release_channel_id(@id) unless auto_recovering?
  @channel_is_open_deferrable = AMQP::Deferrable.new
end
handle_open_ok(open_ok) click to toggle source

@api plugin @private

# File lib/amqp/channel.rb, line 1443
def handle_open_ok(open_ok)
  self.status = :opened
  self.exec_callback_once_yielding_self(:open, open_ok)
end
handle_select_ok(method) click to toggle source

Handler for Confirm.Select-Ok. By default, it just executes hook specified via the confirmations method with a single argument, a protocol method class instance (an instance of AMQ::Protocol::Confirm::SelectOk) and then it deletes the callback, since Confirm.Select is supposed to be sent just once.

@api plugin

# File lib/amqp/channel.rb, line 1350
def handle_select_ok(method)
  self.exec_callback_once(:confirm_select, method)
end
headers(name = 'amq.match', opts = {}, &block) click to toggle source

Defines, intializes and returns a headers Exchange instance.

Learn more about headers exchanges in {Exchange Exchange class documentation}.

@param [String] name (amq.match) Exchange name.

@option opts [Boolean] :passive (false) If set, the server will not create the exchange if it does not

already exist. The client can use this to check whether an exchange
exists without modifying the server state.

@option opts [Boolean] :durable (false) If set when creating a new exchange, the exchange will be marked as

durable. Durable exchanges and their bindings are recreated upon a server
restart (information about them is persisted). Non-durable (transient) exchanges
do not survive if/when a server restarts (information about them is stored exclusively
in RAM).

@option opts [Boolean] :auto_delete (false) If set, the exchange is deleted when all queues have finished

using it. The server waits for a short period of time before
determining the exchange is unused to give time to the client code
to bind a queue to it.

@option opts [Boolean] :internal (default false) If set, the exchange may not be used directly by publishers, but

only when bound to other exchanges. Internal exchanges are used to
construct wiring that is not visible to applications. This is a RabbitMQ-specific
extension.

@option opts [Boolean] :nowait (true) If set, the server will not respond to the method. The client should

not wait for a reply method.  If the server could not complete the
method it will raise a channel or connection exception.

@example Using headers exchange to route messages based on multiple attributes (OS, architecture, # of cores)

puts "=> Headers routing example"
puts
AMQP.start do |connection|
  channel   = AMQP::Channel.new(connection)
  channel.on_error do |ch, channel_close|
    puts "A channel-level exception: #{channel_close.inspect}"
  end

  exchange = channel.headers("amq.match", :durable => true)

  channel.queue("", :auto_delete => true).bind(exchange, :arguments => { 'x-match' => 'all', :arch => "x64", :os => 'linux' }).subscribe do |metadata, payload|
    puts "[linux/x64] Got a message: #{payload}"
  end
  channel.queue("", :auto_delete => true).bind(exchange, :arguments => { 'x-match' => 'all', :arch => "x32", :os => 'linux' }).subscribe do |metadata, payload|
    puts "[linux/x32] Got a message: #{payload}"
  end
  channel.queue("", :auto_delete => true).bind(exchange, :arguments => { 'x-match' => 'any', :os => 'linux', :arch => "__any__" }).subscribe do |metadata, payload|
    puts "[linux] Got a message: #{payload}"
  end
  channel.queue("", :auto_delete => true).bind(exchange, :arguments => { 'x-match' => 'any', :os => 'macosx', :cores => 8 }).subscribe do |metadata, payload|
    puts "[macosx|octocore] Got a message: #{payload}"
  end

  EventMachine.add_timer(0.5) do
    exchange.publish "For linux/x64",   :headers => { :arch => "x64", :os => 'linux' }
    exchange.publish "For linux/x32",   :headers => { :arch => "x32", :os => 'linux' }
    exchange.publish "For linux",       :headers => { :os => 'linux'  }
    exchange.publish "For OS X",        :headers => { :os => 'macosx' }
    exchange.publish "For solaris/x64", :headers => { :os => 'solaris', :arch => 'x64' }
    exchange.publish "For ocotocore",   :headers => { :cores => 8  }
  end

  show_stopper = Proc.new do
    $stdout.puts "Stopping..."
    connection.close {
      EventMachine.stop { exit }
    }
  end

  Signal.trap "INT", show_stopper
  EventMachine.add_timer(2, show_stopper)
end

@see Exchange @see Exchange#initialize @see #default_exchange @see files.travis-ci.org/docs/amqp/0.9.1/AMQP091Specification.pdf AMQP 0.9.1 specification (Section 3.1.3.3)

@return [Exchange] @api public

# File lib/amqp/channel.rb, line 753
def headers(name = 'amq.match', opts = {}, &block)
  if exchange = find_exchange(name)
    extended_opts = Exchange.add_default_options(:headers, name, opts, block)

    validate_parameters_match!(exchange, extended_opts, :exchange)

    block.call(exchange) if block
    exchange
  else
    register_exchange(Exchange.new(self, :headers, name, opts, &block))
  end
end
increment_publisher_index!() click to toggle source

This method is executed after publishing of each message via {Exchage#publish}. Currently it just increments publisher index by 1, so messages can be actually matched.

@api plugin

# File lib/amqp/channel.rb, line 1301
def increment_publisher_index!
  @publisher_index += 1
end
on_ack(nowait = false, &block) click to toggle source

Turn on confirmations for this channel and, if given, register callback for basic.ack from the broker.

@raise [RuntimeError] Occurs when confirmations are already activated. @raise [RuntimeError] Occurs when nowait is true and block is given. @param [Boolean] nowait Whether we expect Confirm.Select-Ok to be returned by the broker or not.

@yield [basick_ack] Callback which will be executed every time we receive Basic.Ack from the broker. @yieldparam [AMQ::Protocol::Basic::Ack] basick_ack Protocol method class instance.

@return [self] self.

# File lib/amqp/channel.rb, line 1322
def on_ack(nowait = false, &block)
  self.define_callback(:ack, &block) if block

  self
end
on_connection_interruption(&block) click to toggle source

Defines a callback that will be executed after TCP connection is interrupted (typically because of a network failure). Only one callback can be defined (the one defined last replaces previously added ones).

@api public

# File lib/amqp/channel.rb, line 1231
def on_connection_interruption(&block)
  self.redefine_callback(:after_connection_interruption, &block)
end
on_error(&block) click to toggle source

Defines a callback that will be executed when channel is closed after channel-level exception.

@api public

# File lib/amqp/channel.rb, line 1109
def on_error(&block)
  self.define_callback(:error, &block)
end
on_nack(&block) click to toggle source

Register error callback for Basic.Nack. It’s called when message(s) is rejected.

@return [self] self

# File lib/amqp/channel.rb, line 1333
def on_nack(&block)
  self.define_callback(:nack, &block) if block

  self
end
on_recovery(&block) click to toggle source

Defines a callback that will be executed after AMQP connection has recovered after a network failure. Only one callback can be defined (the one defined last replaces previously added ones).

@api public

# File lib/amqp/channel.rb, line 1260
def on_recovery(&block)
  self.redefine_callback(:after_recovery, &block)
end
Also aliased as: after_recovery
once_open(&block) click to toggle source

Takes a block that will be deferred till the moment when channel is considered open (channel.open-ok is received from the broker). If you need to delay an operation till the moment channel is open, this method is what you are looking for.

Multiple callbacks are supported. If when this moment is called, channel is already open, block is executed immediately.

@api public

# File lib/amqp/channel.rb, line 931
def once_open(&block)
  @channel_is_open_deferrable.callback do
    # guards against cases when deferred operations
    # don't complete before the channel is closed
    block.call if open?
  end
end
Also aliased as: once_opened
once_opened(&block) click to toggle source
Alias for: once_open
open(&block) click to toggle source

Opens AMQP channel.

@note Instantiated channels are opened by default. This method should only be used for error recovery after network connection loss. @api public

# File lib/amqp/channel.rb, line 907
def open(&block)
  @connection.send_frame(AMQ::Protocol::Channel::Open.encode(@id, AMQ::Protocol::EMPTY_STRING))
  @connection.channels[@id] = self
  self.status = :opening

  self.redefine_callback :open, &block
end
Also aliased as: reopen
open?() click to toggle source

@return [Boolean] true if channel is not closed. @api public

# File lib/amqp/channel.rb, line 919
def open?
  self.status == :opened || self.status == :opening
end
prefetch(count, global = false, &block) click to toggle source

@param [Fixnum] Message count @param [Boolean] global (false)

@return [Channel] self

@api public

# File lib/amqp/channel.rb, line 997
def prefetch(count, global = false, &block)
  self.once_open do
    # RabbitMQ does not support prefetch_size.
    self.qos(0, count, global, &block)

    @options[:prefetch] = count
  end

  self
end
publisher_index() click to toggle source

Publisher index is an index of the last message since the confirmations were activated, started with 0. It’s incremented by 1 every time a message is published. This is done on both client and server, hence this acknowledged messages can be matched via its delivery-tag.

@return [Integer] Current publisher index. @api public

# File lib/amqp/channel.rb, line 1284
def publisher_index
  @publisher_index ||= 0
end
qos(prefetch_size = 0, prefetch_count = 32, global = false, &block) click to toggle source

Requests a specific quality of service. The QoS can be specified for the current channel or for all channels on the connection.

@note RabbitMQ as of 2.3.1 does not support prefetch_size. @api public

# File lib/amqp/channel.rb, line 1213
def qos(prefetch_size = 0, prefetch_count = 32, global = false, &block)
  @connection.send_frame(AMQ::Protocol::Basic::Qos.encode(@id, prefetch_size, prefetch_count, global))

  self.redefine_callback :qos, &block
  self
end
queue(name = AMQ::Protocol::EMPTY_STRING, opts = {}, &block) click to toggle source

Declares and returns a Queue instance associated with this channel. See {Queue Queue class documentation} for more information about queues.

To make broker generate queue name for you (a classic example is exclusive queues that are only used for a short period of time), pass empty string as name value. Then queue will get it’s name as soon as broker’s response (queue.declare-ok) arrives. Note that in this case, block is required.

Like for exchanges, queue names starting with ‘amq.’ cannot be modified and should not be used by applications.

@example Declaring a queue in a mail delivery app using #queue without a block

AMQP.connect do |connection|
  AMQP::Channel.new(connection) do |ch|
    # message producers will be able to send messages to this queue
    # using direct exchange and routing key = "mail.delivery"
    queue = ch.queue("mail.delivery", :durable => true)
    queue.subscribe do |headers, payload|
      # ...
    end
  end
end

@example Declaring a server-named exclusive queue that receives all messages related to events, using a block.

AMQP.connect do |connection|
  AMQP::Channel.new(connection) do |ch|
    # message producers will be able to send messages to this queue
    # using amq.topic exchange with routing keys that begin with "events"
    ch.queue("", :exclusive => true) do |queue|
      queue.bind(ch.exchange("amq.topic"), :routing_key => "events.#").subscribe do |headers, payload|
        # ...
      end
    end
  end
end

@param [String] name Queue name. If you want a server-named queue, you can omit the name (note that in this case, using block is mandatory).

See {Queue Queue class documentation} for discussion of queue lifecycles and when use of server-named queues
is optimal.

@option opts [Boolean] :passive (false) If set, the server will not create the exchange if it does not

already exist. The client can use this to check whether an exchange
exists without modifying the server state.

@option opts [Boolean] :durable (false) If set when creating a new exchange, the exchange will be marked as

durable. Durable exchanges and their bindings are recreated upon a server
restart (information about them is persisted). Non-durable (transient) exchanges
do not survive if/when a server restarts (information about them is stored exclusively
in RAM). Any remaining messages in the queue will be purged when the queue
is deleted regardless of the message's persistence setting.

@option opts [Boolean] :auto_delete (false) If set, the exchange is deleted when all queues have finished

using it. The server waits for a short period of time before
determining the exchange is unused to give time to the client code
to bind a queue to it.

@option opts [Boolean] :exclusive (false) Exclusive queues may only be used by a single connection.

Exclusivity also implies that queue is automatically deleted when connection
is closed. Only one consumer is allowed to remove messages from exclusive queue.

@option opts [Boolean] :nowait (true) If set, the server will not respond to the method. The client should

not wait for a reply method.  If the server could not complete the
method it will raise a channel or connection exception.

@yield [queue, declare_ok] Yields successfully declared queue instance and AMQP method (queue.declare-ok) instance. The latter is optional. @yieldparam [Queue] queue Queue that is successfully declared and is ready to be used. @yieldparam [AMQP::Protocol::Queue::DeclareOk] declare_ok AMQP queue.declare-ok) instance.

@see Queue @see Queue#initialize @see files.travis-ci.org/docs/amqp/0.9.1/AMQP091Specification.pdf AMQP 0.9.1 specification (Section 2.1.4)

@return [Queue] @api public

# File lib/amqp/channel.rb, line 848
def queue(name = AMQ::Protocol::EMPTY_STRING, opts = {}, &block)
  raise ArgumentError.new("queue name must not be nil; if you want broker to generate queue name for you, pass an empty string") if name.nil?

  if name && !name.empty? && (queue = find_queue(name))
    extended_opts = Queue.add_default_options(name, opts, block)

    validate_parameters_match!(queue, extended_opts, :queue)

    block.call(queue) if block
    queue
  else
    self.queue!(name, opts, &block)
  end
end
queue!(name, opts = {}, &block) click to toggle source

Same as {#queue} but when queue with the same name already exists in this channel object’s cache, this method will replace existing queue with a newly defined one. Consider using {#queue} instead.

@see #queue

@return [Queue] @api public

# File lib/amqp/channel.rb, line 871
def queue!(name, opts = {}, &block)
  queue = if block.nil?
            Queue.new(self, name, opts)
          else
            shim = Proc.new { |q, method|
      if block.arity == 1
        block.call(q)
      else
        queue = find_queue(method.queue)
        block.call(queue, method.consumer_count, method.message_count)
      end
    }
            Queue.new(self, name, opts, &shim)
          end

  register_queue(queue)
end
queues() click to toggle source

@return [Array<AMQP::Queue>] Queues cache for this channel @api plugin @private

# File lib/amqp/channel.rb, line 892
def queues
  @queues
end
recover(requeue = true, &block) click to toggle source

Notifies AMQ broker that consumer has recovered and unacknowledged messages need to be redelivered.

@return [Channel] self

@note RabbitMQ as of 2.3.1 does not support basic.recover with requeue = false. @see files.travis-ci.org/docs/amqp/0.9.1/AMQP091Reference.pdf AMQP 0.9.1 protocol documentation (Section 1.8.3.16.) @see acknowledge @api public

# File lib/amqp/channel.rb, line 1051
def recover(requeue = true, &block)
  @connection.send_frame(AMQ::Protocol::Basic::Recover.encode(@id, requeue))

  self.redefine_callback :recover, &block
  self
end
register_exchange(exchange) click to toggle source

Implementation

# File lib/amqp/channel.rb, line 1381
def register_exchange(exchange)
  raise ArgumentError, "argument is nil!" if exchange.nil?

  @exchanges[exchange.name] = exchange
end
register_queue(queue) click to toggle source

@api plugin @private

# File lib/amqp/channel.rb, line 1399
def register_queue(queue)
  raise ArgumentError, "argument is nil!" if queue.nil?

  @queues[queue.name] = queue
end
reject(delivery_tag, requeue = true, multi = false) click to toggle source

Reject a message with given delivery tag.

@api public @see acknowledge @see recover @see files.travis-ci.org/docs/amqp/0.9.1/AMQP091Reference.pdf AMQP 0.9.1 protocol documentation (Section 1.8.3.14.)

# File lib/amqp/channel.rb, line 1032
def reject(delivery_tag, requeue = true, multi = false)
  if multi
    @connection.send_frame(AMQ::Protocol::Basic::Nack.encode(self.id, delivery_tag, multi, requeue))
  else
    @connection.send_frame(AMQ::Protocol::Basic::Reject.encode(self.id, delivery_tag, requeue))
  end

  self
end
reopen(&block) click to toggle source
Alias for: open
reset(&block) click to toggle source

Resets channel state (for example, list of registered queue objects and so on).

Most of the time, this method is not called by application code.

@private @api plugin

# File lib/amqp/channel.rb, line 1151
def reset(&block)
  # See AMQP::Channel
  self.reset_state!

  # there is no way to reset a deferrable; we have to use a new instance. MK.
  @channel_is_open_deferrable = AMQP::Deferrable.new
  @channel_is_open_deferrable.callback(&block)

  @connection.on_connection do
    @channel_is_open_deferrable.succeed

    self.prefetch(@options[:prefetch], false) if @options[:prefetch]
  end
end
reset_publisher_index!() click to toggle source

Resets publisher index to 0

@api plugin

# File lib/amqp/channel.rb, line 1291
def reset_publisher_index!
  @publisher_index = 0
end
reset_state!() click to toggle source

@api plugin @private

# File lib/amqp/channel.rb, line 1417
def reset_state!
  @flow_is_active                = true

  @queues_awaiting_declare_ok    = Array.new
  @exchanges_awaiting_declare_ok = Array.new
  @exchanges_awaiting_bind_ok    = Array.new
  @exchanges_awaiting_unbind_ok  = Array.new

  @queues_awaiting_delete_ok     = Array.new

  @exchanges_awaiting_delete_ok  = Array.new
  @queues_awaiting_purge_ok      = Array.new
  @queues_awaiting_bind_ok       = Array.new
  @queues_awaiting_unbind_ok     = Array.new
  @consumers_awaiting_consume_ok = Array.new
  @consumers_awaiting_cancel_ok  = Array.new

  @queues_awaiting_get_response  = Array.new

  @callbacks                     = @callbacks.delete_if { |k, v| !RECOVERY_EVENTS.include?(k) }
  @uses_publisher_confirmations  = false
end
reuse() click to toggle source

Can be used to recover channels from channel-level exceptions. Allocates a new channel id and reopens itself with this new id, releasing the old id after the new one is allocated.

This includes recovery of known exchanges, queues and bindings, exactly the same way as when the client recovers from a network failure.

@api public

# File lib/amqp/channel.rb, line 340
def reuse
  old_id = @id
  # must release after we allocate a new id, otherwise we will end up
  # with the same value. MK.
  @id    = @connection.next_channel_id
  @connection.release_channel_id(old_id)

  @channel_is_open_deferrable.fail
  @channel_is_open_deferrable = AMQP::Deferrable.new

  self.open do
    @channel_is_open_deferrable.succeed

    # re-establish prefetch
    self.prefetch(@options[:prefetch], false) if @options[:prefetch]

    # exchanges must be recovered first because queue recovery includes recovery of bindings. MK.
    @exchanges.each { |name, e| e.auto_recover }
    @queues.each    { |name, q| q.auto_recover }
  end
end
run_after_recovery_callbacks() click to toggle source

@private

# File lib/amqp/channel.rb, line 1266
def run_after_recovery_callbacks
  self.exec_callback_yielding_self(:after_recovery)

  @queues.each    { |name, q| q.run_after_recovery_callbacks }
  @exchanges.each { |name, e| e.run_after_recovery_callbacks }
end
run_before_recovery_callbacks() click to toggle source

@private

# File lib/amqp/channel.rb, line 1247
def run_before_recovery_callbacks
  self.exec_callback_yielding_self(:before_recovery)

  @queues.each    { |name, q| q.run_before_recovery_callbacks }
  @exchanges.each { |name, e| e.run_before_recovery_callbacks }
end
synchronize(&block) click to toggle source

Synchronizes given block using this channel’s mutex. @api public

# File lib/amqp/channel.rb, line 1202
def synchronize(&block)
  @mutex.synchronize(&block)
end
topic(name = 'amq.topic', opts = {}, &block) click to toggle source

Defines, intializes and returns a topic Exchange instance.

Learn more about topic exchanges in {Exchange Exchange class documentation}.

@param [String] name (amq.topic) Exchange name.

@option opts [Boolean] :passive (false) If set, the server will not create the exchange if it does not

already exist. The client can use this to check whether an exchange
exists without modifying the server state.

@option opts [Boolean] :durable (false) If set when creating a new exchange, the exchange will be marked as

durable. Durable exchanges and their bindings are recreated upon a server
restart (information about them is persisted). Non-durable (transient) exchanges
do not survive if/when a server restarts (information about them is stored exclusively
in RAM).

@option opts [Boolean] :auto_delete (false) If set, the exchange is deleted when all queues have finished

using it. The server waits for a short period of time before
determining the exchange is unused to give time to the client code
to bind a queue to it.

@option opts [Boolean] :internal (default false) If set, the exchange may not be used directly by publishers, but

only when bound to other exchanges. Internal exchanges are used to
construct wiring that is not visible to applications. This is a RabbitMQ-specific
extension.

@option opts [Boolean] :nowait (true) If set, the server will not respond to the method. The client should

not wait for a reply method.  If the server could not complete the
method it will raise a channel or connection exception.

@example Using topic exchange to deliver relevant news updates

AMQP.connect do |connection|
  channel  = AMQP::Channel.new(connection)
  exchange = channel.topic("pub/sub")

  # Subscribers.
  channel.queue("development").bind(exchange, :key => "technology.dev.#").subscribe do |payload|
    puts "A new dev post: '#{payload}'"
  end
  channel.queue("ruby").bind(exchange, :key => "technology.#.ruby").subscribe do |payload|
    puts "A new post about Ruby: '#{payload}'"
  end

  # Let's publish some data.
  exchange.publish "Ruby post",     :routing_key => "technology.dev.ruby"
  exchange.publish "Erlang post",   :routing_key => "technology.dev.erlang"
  exchange.publish "Sinatra post",  :routing_key => "technology.web.ruby"
  exchange.publish "Jewelery post", :routing_key => "jewelery.ruby"
end

@example Using topic exchange to deliver geographically-relevant data

AMQP.connect do |connection|
  channel  = AMQP::Channel.new(connection)
  exchange = channel.topic("pub/sub")

  # Subscribers.
  channel.queue("americas.north").bind(exchange, :routing_key => "americas.north.#").subscribe do |headers, payload|
    puts "An update for North America: #{payload}, routing key is #{headers.routing_key}"
  end
  channel.queue("americas.south").bind(exchange, :routing_key => "americas.south.#").subscribe do |headers, payload|
    puts "An update for South America: #{payload}, routing key is #{headers.routing_key}"
  end
  channel.queue("us.california").bind(exchange, :routing_key => "americas.north.us.ca.*").subscribe do |headers, payload|
    puts "An update for US/California: #{payload}, routing key is #{headers.routing_key}"
  end
  channel.queue("us.tx.austin").bind(exchange, :routing_key => "#.tx.austin").subscribe do |headers, payload|
    puts "An update for Austin, TX: #{payload}, routing key is #{headers.routing_key}"
  end
  channel.queue("it.rome").bind(exchange, :routing_key => "europe.italy.rome").subscribe do |headers, payload|
    puts "An update for Rome, Italy: #{payload}, routing key is #{headers.routing_key}"
  end
  channel.queue("asia.hk").bind(exchange, :routing_key => "asia.southeast.hk.#").subscribe do |headers, payload|
    puts "An update for Hong Kong: #{payload}, routing key is #{headers.routing_key}"
  end

  exchange.publish("San Diego update", :routing_key => "americas.north.us.ca.sandiego").
    publish("Berkeley update",         :routing_key => "americas.north.us.ca.berkeley").
    publish("San Francisco update",    :routing_key => "americas.north.us.ca.sanfrancisco").
    publish("New York update",         :routing_key => "americas.north.us.ny.newyork").
    publish("São Paolo update",        :routing_key => "americas.south.brazil.saopaolo").
    publish("Hong Kong update",        :routing_key => "asia.southeast.hk.hongkong").
    publish("Kyoto update",            :routing_key => "asia.southeast.japan.kyoto").
    publish("Shanghai update",         :routing_key => "asia.southeast.prc.shanghai").
    publish("Rome update",             :routing_key => "europe.italy.roma").
    publish("Paris update",            :routing_key => "europe.france.paris")
end

@see Exchange @see Exchange#initialize @see www.rabbitmq.com/faq.html#Binding-and-Routing RabbitMQ FAQ on routing & wildcards @see files.travis-ci.org/docs/amqp/0.9.1/AMQP091Specification.pdf AMQP 0.9.1 specification (Section 3.1.3.3)

@return [Exchange] @api public

# File lib/amqp/channel.rb, line 651
def topic(name = 'amq.topic', opts = {}, &block)
  if exchange = find_exchange(name)
    extended_opts = Exchange.add_default_options(:topic, name, opts, block)

    validate_parameters_match!(exchange, extended_opts, :exchange)

    block.call(exchange) if block
    exchange
  else
    register_exchange(Exchange.new(self, :topic, name, opts, &block))
  end
end
tx_commit(&block) click to toggle source

Commits AMQP transaction.

@api public

# File lib/amqp/channel.rb, line 1079
def tx_commit(&block)
  @connection.send_frame(AMQ::Protocol::Tx::Commit.encode(@id))

  self.redefine_callback :tx_commit, &block
  self
end
tx_rollback(&block) click to toggle source

Rolls AMQP transaction back.

@api public

# File lib/amqp/channel.rb, line 1089
def tx_rollback(&block)
  @connection.send_frame(AMQ::Protocol::Tx::Rollback.encode(@id))

  self.redefine_callback :tx_rollback, &block
  self
end
tx_select(&block) click to toggle source

Sets the channel to use standard transactions. One must use this method at least once on a channel before using tx_tommit or #tx_rollback methods.

@api public

# File lib/amqp/channel.rb, line 1069
def tx_select(&block)
  @connection.send_frame(AMQ::Protocol::Tx::Select.encode(@id))

  self.redefine_callback :tx_select, &block
  self
end
uses_publisher_confirmations?() click to toggle source

@return [Boolean]

# File lib/amqp/channel.rb, line 1306
def uses_publisher_confirmations?
  @uses_publisher_confirmations
end

Protected Instance Methods

validate_parameters_match!(entity, parameters, type) click to toggle source
# File lib/amqp/channel.rb, line 1545
def validate_parameters_match!(entity, parameters, type)
  unless entity.opts.values_at(*@parameter_checks[type]) == parameters.values_at(*@parameter_checks[type]) || parameters[:passive]
    raise AMQP::IncompatibleOptionsError.new(entity.name, entity.opts, parameters)
  end
end

Public Class Methods

new(connection = nil, id = nil, options = {}, &block) click to toggle source

@param [AMQP::Session] connection Connection to open this channel on. If not given, default AMQP

connection (accessible via {AMQP.connection}) will be used.

@param [Integer] id Channel id. Must not be greater than max channel id client and broker

negotiated on during connection setup. Almost always the right thing to do
is to let AMQP gem pick channel identifier for you.

@param [Hash] options A hash of options

@example Instantiating a channel for default connection (accessible as AMQP.connection)

AMQP.connect do |connection|
  AMQP::Channel.new(connection) do |channel, open_ok|
    # channel is ready: set up your messaging flow by creating exchanges,
    # queues, binding them together and so on.
  end
end

@example Instantiating a channel for explicitly given connection

AMQP.connect do |connection|
  AMQP::Channel.new(connection) do |channel, open_ok|
    # ...
  end
end

@example Instantiating a channel with a :prefetch option

AMQP.connect do |connection|
  AMQP::Channel.new(connection, :prefetch => 5) do |channel, open_ok|
    # ...
  end
end

@option options [Boolean] :prefetch (nil) Specifies number of messages to prefetch. Channel-specific. See {#prefetch}. @option options [Boolean] :#auto_recovery (nil) Turns on automatic network failure recovery mode for this channel.

@yield [channel, open_ok] Yields open channel instance and AMQP method (channel.open-ok) instance. The latter is optional. @yieldparam [Channel] channel Channel that is successfully open @yieldparam [AMQP::Protocol::Channel::OpenOk] open_ok AMQP channel.open-ok) instance

@see #prefetch @api public

# File lib/amqp/channel.rb, line 232
def initialize(connection = nil, id = nil, options = {}, &block)
  raise 'AMQP can only be used from within EM.run {}' unless EM.reactor_running?

  @connection = connection || AMQP.connection || AMQP.start
  # this means 2nd argument is options
  if id.kind_of?(Hash)
    options = options.merge(id)
    id      = @connection.next_channel_id
  end

  super(@connection)

  @id        = id || @connection.next_channel_id
  @exchanges = Hash.new
  @queues    = Hash.new
  @consumers = Hash.new
  @options       = { :auto_recovery => @connection.auto_recovering? }.merge(options)
  @auto_recovery = (!!@options[:auto_recovery])

  # we must synchronize frameset delivery. MK.
  @mutex     = Mutex.new

  reset_state!

  # 65536 is here for cases when channel is opened without passing a callback in,
  # otherwise channel_mix would be nil and it causes a lot of needless headaches.
  # lets just have this default. MK.
  channel_max = if @connection.open?
                  @connection.channel_max || 65536
                else
                  65536
                end

  if channel_max != 0 && !(0..channel_max).include?(@id)
    raise ArgumentError.new("Max channel for the connection is #{channel_max}, given: #{@id}")
  end

  # we need this deferrable to mimic what AMQP gem 0.7 does to enable
  # the following (pseudo-synchronous) style of programming some people use in their
  # existing codebases:
  #
  # connection = AMQP.connect
  # channel    = AMQP::Channel.new(connection)
  # queue      = AMQP::Queue.new(channel)
  #
  # ...
  #
  # Read more about EM::Deferrable#callback behavior in EventMachine documentation. MK.
  @channel_is_open_deferrable = AMQP::Deferrable.new

  @parameter_checks = {:queue => [:durable, :exclusive, :auto_delete, :arguments], :exchange => [:type, :durable, :arguments]}

  # only send channel.open when connection is actually open. Makes it possible to
  # do c = AMQP.connect; AMQP::Channel.new(c) that is what some people do. MK.
  @connection.on_connection do
    self.open do |ch, open_ok|
      @channel_is_open_deferrable.succeed

      if block
        case block.arity
        when 1 then block.call(ch)
        else block.call(ch, open_ok)
        end # case
      end # if

      self.prefetch(@options[:prefetch], false) if @options[:prefetch]
    end # self.open
  end # @connection.on_open
end