Class Raindrops::Aggregate::PMQ
In: lib/raindrops/aggregate/pmq.rb
Parent: Object

\Aggregate + POSIX message queues support for Ruby 1.9 and \Linux

This class is duck-type compatible with \Aggregate and allows us to aggregate and share statistics from multiple processes/threads aided POSIX message queues. This is designed to be used with the Raindrops::LastDataRecv Rack application, but can be used independently on compatible Runtimes.

Unlike the core of raindrops, this is only supported on Ruby 1.9 and Linux 2.6. Using this class requires the following additional RubyGems or libraries:

  • aggregate (tested with 0.2.2)
  • io-extra (tested with 1.2.3)
  • posix_mq (tested with 1.0.0)

Design

There is one master thread which aggregates statistics. Individual worker processes or threads will write to a shared POSIX message queue (default: "/raindrops") that the master reads from. At a predefined interval, the master thread will write out to a shared, anonymous temporary file that workers may read from

Setting +:worker_interval+ and +:master_interval+ to +1+ will result in perfect accuracy but at the cost of a high synchronization overhead. Larger intervals mean less frequent messaging for higher performance but lower accuracy.

Methods

<<   aggregate   count   each   each_nonzero   flush   flush_master   master_loop   max   mean   min   new   outliers_high   outliers_low   std_dev   stop_master_loop   sum   to_s  

Attributes

nr_dropped  [R]  returns the number of dropped messages sent to a POSIX message queue if non-blocking operation was desired with :lossy

Public Class methods

Creates a new Raindrops::Aggregate::PMQ object

  Raindrops::Aggregate::PMQ.new(options = {})  -> aggregate

options is a hash that accepts the following keys:

  • :queue - name of the POSIX message queue (default: "/raindrops")
  • :worker_interval - interval to send to the master (default: 10)
  • :master_interval - interval to for the master to write out (default: 5)
  • :lossy - workers drop packets if master cannot keep up (default: false)
  • :aggregate - \Aggregate object (default: \Aggregate.new)
  • :mq_umask - umask for creatingthe POSIX message queue (default: 0666)

Public Instance methods

adds a sample to the underlying \Aggregate object

Loads the last shared \Aggregate from the master thread/process

proxy for \Aggregate#count

proxy for \Aggregate#each

proxy for \Aggregate#each_nonzero

flushes the local queue of the worker process, sending all pending data to the master. There is no need to call this explicitly as +:worker_interval+ defines how frequently your queue will be flushed

Flushes the currently aggregate statistics to a temporary file. There is no need to call this explicitly as +:worker_interval+ defines how frequently your data will be flushed for workers to read.

Starts running a master loop, usually in a dedicated thread or process:

  Thread.new { agg.master_loop }

Any worker can call +agg.stop_master_loop+ to stop the master loop (possibly causing the thread or process to exit)

proxy for \Aggregate#max

proxy for \Aggregate#mean

proxy for \Aggregate#min

proxy for \Aggregate#outliers_high

proxy for \Aggregate#outliers_low

proxy for \Aggregate#std_dev

stops the currently running master loop, may be called from any worker thread or process

proxy for \Aggregate#sum

proxy for \Aggregate#to_s

[Validate]