# File lib/taps/operation.rb, line 480
  def push_data_from_table(stream, progress)
    loop do
      if exiting?
        store_session
        exit 0
      end

      row_size = 0
      chunksize = stream.state[:chunksize]

      begin
        chunksize = Taps::Utils.calculate_chunksize(chunksize) do |c|
          stream.state[:chunksize] = c.to_i
          encoded_data, row_size, elapsed_time = nil
          d1 = c.time_delta do
            encoded_data, row_size, elapsed_time = stream.fetch
          end
          break if stream.complete?

          data = nil
          d2 = c.time_delta do
            data = {
              :state => stream.to_hash,
              :checksum => Taps::Utils.checksum(encoded_data).to_s
            }
          end

          begin
            content, content_type = nil
            d3 = c.time_delta do
              content, content_type = Taps::Multipart.create do |r|
                r.attach :name => :encoded_data,
                  :payload => encoded_data,
                  :content_type => 'application/octet-stream'
                r.attach :name => :json,
                  :payload => OkJson.encode(data),
                  :content_type => 'application/json'
              end
            end
            session_resource['push/table'].post(content, http_headers(:content_type => content_type))
            self.stream_state = stream.to_hash
          rescue => e
            Taps::Utils.reraise_server_exception(e)
          end

          c.idle_secs = (d1 + d2 + d3)

          elapsed_time
        end
      rescue Taps::CorruptedData => e
        # retry the same data, it got corrupted somehow.
        next
      rescue Taps::DuplicatePrimaryKeyError => e
        # verify the stream and retry it
        stream = stream.verify_remote_stream(session_resource['push/verify_stream'], http_headers)
        next
      end
      stream.state[:chunksize] = chunksize

      progress.inc(row_size)

      stream.increment(row_size)
      break if stream.complete?
    end

    progress.finish
    completed_tables << stream.table_name.to_s
    self.stream_state = {}
  end