A buffered I/O class witch fits into the Rev Watcher framework. It provides both an observer which reads data as it’s received from the wire and a buffered write watcher which stores data and writes it out each time the socket becomes writable.
This class is primarily meant as a base class for other streams which need non-blocking writing, and is used to implement Rev’s Socket class and its associated subclasses.
Maximum number of bytes to consume at once
Attach to the event loop
# File lib/rev/io.rb, line 34 def attach(loop); @_read_watcher.attach loop; schedule_write if !@_write_buffer.empty?; self; end
Is the watcher attached?
# File lib/rev/io.rb, line 46 def attached?; @_read_watcher.attached?; end
Close the IO stream
# File lib/rev/io.rb, line 87 def close detach if attached? detach_write_watcher @_io.close unless @_io.closed? on_close nil end
Is the IO object closed?
# File lib/rev/io.rb, line 97 def closed? @_io.nil? or @_io.closed? end
Detach from the event loop
# File lib/rev/io.rb, line 37 def detach; @_read_watcher.detach; self; end
Disable the watcher
# File lib/rev/io.rb, line 43 def disable; @_read_watcher.disable; self; end
Enable the watcher
# File lib/rev/io.rb, line 40 def enable; @_read_watcher.enable; self; end
Is the watcher enabled?
# File lib/rev/io.rb, line 49 def enabled?; @_read_watcher.enabled?; end
Obtain the event loop associated with this object
# File lib/rev/io.rb, line 52 def evloop; @_read_watcher.evloop; end
Called whenever the IO object hits EOF
# File lib/rev/io.rb, line 67 def on_close; end
Called whenever the IO object receives data
# File lib/rev/io.rb, line 59 def on_read(data); end
Called whenever a write completes and the output buffer is empty
# File lib/rev/io.rb, line 63 def on_write_complete; end
Number of bytes are currently in the output buffer
# File lib/rev/io.rb, line 82 def output_buffer_size @_write_buffer.size end
Write data in a buffered, non-blocking manner
# File lib/rev/io.rb, line 75 def write(data) @_write_buffer << data schedule_write data.size end
# File lib/rev/io.rb, line 151 def detach_write_watcher @_write_watcher.detach if @_write_watcher and @_write_watcher.attached? end
# File lib/rev/io.rb, line 147 def disable_write_watcher @_write_watcher.disable if @_write_watcher and @_write_watcher.enabled? end
# File lib/rev/io.rb, line 139 def enable_write_watcher if @_write_watcher.attached? @_write_watcher.enable unless @_write_watcher.enabled? else @_write_watcher.attach(evloop) end end
Read from the input buffer and dispatch to #on_read
# File lib/rev/io.rb, line 106 def on_readable begin on_read @_io.read_nonblock(INPUT_SIZE) rescue Errno::EAGAIN rescue Errno::ECONNRESET, EOFError close end end
Write the contents of the output buffer
# File lib/rev/io.rb, line 116 def on_writable begin @_write_buffer.write_to(@_io) rescue Errno::EPIPE, Errno::ECONNRESET return close end if @_write_buffer.empty? disable_write_watcher on_write_complete end end
Schedule a write to be performed when the IO object becomes writable
# File lib/rev/io.rb, line 130 def schedule_write return unless @_io # this would mean 'we are still pre DNS here' return unless attached? # this would mean 'currently unattached' -- ie still pre DNS, or just plain not attached, which is ok begin enable_write_watcher rescue IOError end end
# File lib/rev/io.rb, line 22 def initialize(io) @_io = io @_write_buffer ||= ::IO::Buffer.new @_read_watcher = Watcher.new(io, self, :r) @_write_watcher = Watcher.new(io, self, :w) end