def push_data_from_table(stream, progress)
loop do
if exiting?
store_session
exit 0
end
row_size = 0
chunksize = stream.state[:chunksize]
begin
chunksize = Taps::Utils.calculate_chunksize(chunksize) do |c|
stream.state[:chunksize] = c.to_i
encoded_data, row_size, elapsed_time = nil
d1 = c.time_delta do
encoded_data, row_size, elapsed_time = stream.fetch
end
break if stream.complete?
data = nil
d2 = c.time_delta do
data = {
:state => stream.to_hash,
:checksum => Taps::Utils.checksum(encoded_data).to_s
}
end
begin
content, content_type = nil
d3 = c.time_delta do
content, content_type = Taps::Multipart.create do |r|
r.attach :name => :encoded_data,
:payload => encoded_data,
:content_type => 'application/octet-stream'
r.attach :name => :json,
:payload => OkJson.encode(data),
:content_type => 'application/json'
end
end
session_resource['push/table'].post(content, http_headers(:content_type => content_type))
self.stream_state = stream.to_hash
rescue => e
Taps::Utils.reraise_server_exception(e)
end
c.idle_secs = (d1 + d2 + d3)
elapsed_time
end
rescue Taps::CorruptedData => e
next
rescue Taps::DuplicatePrimaryKeyError => e
stream = stream.verify_remote_stream(session_resource['push/verify_stream'], http_headers)
next
end
stream.state[:chunksize] = chunksize
progress.inc(row_size)
stream.increment(row_size)
break if stream.complete?
end
progress.finish
completed_tables << stream.table_name.to_s
self.stream_state = {}
end