def connect
@auto_reconnect = true
@connection = EM.connect(@host, @port, Connection, @host, @port)
@connection.on(:closed) do
cancel_inactivity_checks
if @connected
@defs.each { |d| d.fail(Error.new("Redis disconnected")) }
@defs = []
@deferred_status = nil
@connected = false
if @auto_reconnect
EM.next_tick { reconnect }
end
emit(:disconnected)
EM::Hiredis.logger.info("#{@connection} Disconnected")
else
if @auto_reconnect
@reconnect_failed_count += 1
@reconnect_timer = EM.add_timer(EM::Hiredis.reconnect_timeout) {
@reconnect_timer = nil
reconnect
}
emit(:reconnect_failed, @reconnect_failed_count)
EM::Hiredis.logger.info("#{@connection} Reconnect failed")
if @reconnect_failed_count >= 4
emit(:failed)
self.fail(Error.new("Could not connect after 4 attempts"))
end
end
end
end
@connection.on(:connected) do
@connected = true
@reconnect_failed_count = 0
@failed = false
auth(@password) if @password
select(@db) unless @db == 0
@command_queue.each do |df, command, args|
@connection.send_command(command, args)
@defs.push(df)
end
@command_queue = []
schedule_inactivity_checks
emit(:connected)
EM::Hiredis.logger.info("#{@connection} Connected")
succeed
if @reconnecting
@reconnecting = false
emit(:reconnected)
end
end
@connection.on(:message) do |reply|
if RuntimeError === reply
raise "Replies out of sync: #{reply.inspect}" if @defs.empty?
deferred = @defs.shift
error = RedisError.new(reply.message)
error.redis_error = reply
deferred.fail(error) if deferred
else
@inactive_seconds = 0
handle_reply(reply)
end
end
@connected = false
@reconnecting = false
return self
end