Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Use IO.copy_stream when possible #383

Open
wants to merge 1 commit into
base: master
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
17 changes: 9 additions & 8 deletions lib/httpclient.rb
Original file line number Diff line number Diff line change
Expand Up @@ -651,8 +651,8 @@ def redirect_uri_callback=(redirect_uri_callback)
# use get method. get returns HTTP::Message as a response and you need to
# follow HTTP redirect by yourself if you need.
def get_content(uri, *args, &block)
query, header = keyword_argument(args, :query, :header)
success_content(follow_redirect(:get, uri, query, nil, header || {}, &block))
query, header, to = keyword_argument(args, :query, :header, :to)
Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Maybe we can do better in term of API. But it seemed logical to expose this optimized code path through get_content only.

So: client.get_content('/big-file.bin', to: '/tmp/big-file.bin')

success_content(follow_redirect(:get, uri, query, nil, header || {}, to, &block))
end

# Posts a content.
Expand Down Expand Up @@ -995,7 +995,7 @@ def hashy_argument_has_keys(args, *key)
key.all? { |e| args[0].key?(e) }
end

def do_request(method, uri, query, body, header, &block)
def do_request(method, uri, query, body, header, to = nil, &block)
res = nil
if HTTP::Message.file?(body)
pos = body.pos rescue nil
Expand All @@ -1016,7 +1016,7 @@ def do_request(method, uri, query, body, header, &block)
# We want to delete Connection usage in do_get_block but Newrelic gem depends on it.
# https://github.com/newrelic/rpm/blob/master/lib/new_relic/agent/instrumentation/httpclient.rb#L34-L36
conn = Connection.new
res = do_get_block(req, proxy, conn, &block)
res = do_get_block(req, proxy, conn, to, &block)
# Webmock's do_get_block returns ConditionVariable
if !res.respond_to?(:previous)
res = conn.pop
Expand Down Expand Up @@ -1085,7 +1085,7 @@ def adapt_block(&block)
proc { |r, str| block.call(str) }
end

def follow_redirect(method, uri, query, body, header, &block)
def follow_redirect(method, uri, query, body, header, to = nil, &block)
uri = to_resource_url(uri)
if block
b = adapt_block(&block)
Expand All @@ -1101,7 +1101,7 @@ def follow_redirect(method, uri, query, body, header, &block)
request_query = query
while retry_number < @follow_redirect_count
body.pos = pos if pos
res = do_request(method, uri, request_query, body, header, &filtered_block)
res = do_request(method, uri, request_query, body, header, to, &filtered_block)
res.previous = previous
if res.redirect?
if res.header['location'].empty?
Expand Down Expand Up @@ -1226,7 +1226,7 @@ def no_proxy?(uri)

# !! CAUTION !!
# Method 'do_get*' runs under MT conditon. Be careful to change.
def do_get_block(req, proxy, conn, &block)
def do_get_block(req, proxy, conn, to = nil, &block)
@request_filter.each do |filter|
filter.filter_request(req)
end
Expand All @@ -1244,7 +1244,8 @@ def do_get_block(req, proxy, conn, &block)
@debug_dev << "\n\n= Response\n\n" if @debug_dev
do_get_header(req, res, sess)
conn.push(res)
sess.get_body do |part|

sess.get_body(to) do |part|
set_encoding(part, res.body_encoding)
if block
block.call(res, part.dup)
Expand Down
139 changes: 77 additions & 62 deletions lib/httpclient/session.rb
Original file line number Diff line number Diff line change
Expand Up @@ -29,8 +29,6 @@


class HTTPClient


# Represents a Site: protocol scheme, host String and port Number.
class Site
# Protocol scheme.
Expand Down Expand Up @@ -574,29 +572,31 @@ def eof?
end
end

def get_body(&block)
begin
read_header if @state == :META
return nil if @state != :DATA
if @transparent_gzip_decompression
block = content_inflater_block(@content_encoding, block)
end
if @chunked
read_body_chunked(&block)
elsif @content_length
read_body_length(&block)
else
read_body_rest(&block)
end
rescue
close
raise
end
if eof?
if @next_connection
@state = :WAIT
else
def get_body(to = nil, &block)
cast_to_io(to, block) do |io|
begin
read_header if @state == :META
return nil if @state != :DATA
if @transparent_gzip_decompression
io = content_inflater(@content_encoding, io)
end
if @chunked
read_body_chunked(io)
elsif @content_length
read_body_length(io)
else
read_body_rest(io)
end
rescue
close
raise
end
if eof?
if @next_connection
@state = :WAIT
else
close
end
end
end
nil
Expand Down Expand Up @@ -697,7 +697,7 @@ def first_inflate(body)
end
end

def content_inflater_block(content_encoding, block)
def content_inflater(content_encoding, io)
case content_encoding
when 'gzip', 'x-gzip'
# zlib itself has a functionality to decompress gzip stream.
Expand All @@ -706,15 +706,12 @@ def content_inflater_block(content_encoding, block)
# > windowBits can also be greater than 15 for optional gzip decoding. Add 32 to
# > windowBits to enable zlib and gzip decoding with automatic header detection,
# > or add 16 to decode only the gzip format
inflate_stream = Zlib::Inflate.new(Zlib::MAX_WBITS + 32)
IOInflater.new(io, Zlib::Inflate.new(Zlib::MAX_WBITS + 32))
when 'deflate'
inflate_stream = LenientInflater.new
IOInflater.new(io, LenientInflater.new)
else
return block
io
end
Proc.new { |buf|
block.call(inflate_stream.inflate(buf))
}
end

def set_header(req)
Expand Down Expand Up @@ -872,35 +869,20 @@ def parse_content_header(key, value)
end
end

def read_body_length(&block)
return nil if @content_length == 0
while true
buf = empty_bin_str
maxbytes = @read_block_size
maxbytes = @content_length if maxbytes > @content_length && @content_length > 0
::Timeout.timeout(@receive_timeout, ReceiveTimeoutError) do
begin
@socket.readpartial(maxbytes, buf)
rescue EOFError
close
buf = nil
if @strict_response_size_check
raise BadResponseError.new("EOF while reading rest #{@content_length} bytes")
end
end
end
if buf && buf.bytesize > 0
@content_length -= buf.bytesize
yield buf
else
@content_length = 0
end
return if @content_length == 0
def read_body_length(io)
::Timeout.timeout(@receive_timeout, ReceiveTimeoutError) do
@content_length -= IO.copy_stream(@socket, io, @content_length)
end

if @strict_response_size_check && @content_length > 0
raise BadResponseError.new("EOF while reading rest #{@content_length} bytes")
else
@content_length = 0
end
end

RS = "\r\n"
def read_body_chunked(&block)
def read_body_chunked(io)
buf = empty_bin_str
while true
::Timeout.timeout(@receive_timeout, ReceiveTimeoutError) do
Expand All @@ -919,14 +901,14 @@ def read_body_chunked(&block)
@socket.read(2)
end
unless buf.empty?
yield buf
io.write(buf)
end
end
end

def read_body_rest
def read_body_rest(io)
if @readbuf and @readbuf.bytesize > 0
yield @readbuf
io.write(@readbuf)
@readbuf = nil
end
while true
Expand All @@ -942,19 +924,52 @@ def read_body_rest
end
end
if buf && buf.bytesize > 0
yield buf
io.write(buf)
else
return
end
end
end

def cast_to_io(to, block)
if to.respond_to?(:write)
yield to
elsif to
File.open(to, 'w+') do |file|
yield file
end
else
yield IOBlockAdapter.new(block)
end
end

class IOInflater
def initialize(io, inflater)
@io = io
@inflater = inflater
end

def write(chunk)
Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Something that is not very well documented, is that copy_steam do accept fake IO objects as long as they respond to #write(). However it's kind of a fallback codepath, as it won't be able to use sendfile() so there will be no speed up.

@io.write(@inflater.inflate(chunk))
chunk.bytesize
end
end

class IOBlockAdapter
def initialize(block)
@block = block
end

def write(chunk)
@block.call(chunk)
chunk.bytesize
end
end

def empty_bin_str
str = ''
str.force_encoding('BINARY') if str.respond_to?(:force_encoding)
str
end
end


end
2 changes: 1 addition & 1 deletion test/helper.rb
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,7 @@
require 'stringio'
require 'cgi'
require 'webrick/httputils'

require 'tmpdir'

module Helper
Port = 17171
Expand Down
53 changes: 43 additions & 10 deletions test/test_httpclient.rb
Original file line number Diff line number Diff line change
Expand Up @@ -629,6 +629,41 @@ def test_get_content_with_block
end
end

def test_get_content_with_path
with_tmp_path do |path|
@client.get_content(serverurl + 'hello', to: path)
assert_equal('hello', File.read(path))
end
end

def test_get_content_with_io
with_tmp_path do |path|
File.open(path, 'w+') do |io|
@client.get_content(serverurl + 'hello', to: io)
end
assert_equal('hello', File.read(path))
end
end

def test_get_gzipped_content_with_io
@client.transparent_gzip_decompression = true

with_tmp_path do |path|
@client.get_content(serverurl + 'compressed?enc=gzip', to: path)
assert_equal('hello', File.read(path))
end

with_tmp_path do |path|
@client.get_content(serverurl + 'compressed?enc=deflate', to: path)
assert_equal('hello', File.read(path))
end

with_tmp_path do |path|
@client.get_content(serverurl + 'compressed?enc=deflate_noheader', to: path)
assert_equal('hello', File.read(path))
end
end

def test_post_content
assert_equal('hello', @client.post_content(serverurl + 'hello'))
assert_equal('hello', @client.post_content(serverurl + 'redirect1'))
Expand Down Expand Up @@ -829,16 +864,6 @@ def test_get_with_block_arity_2_and_redirects
assert_nil(res.content)
end

def test_get_with_block_string_recycle
Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I feel bad about removing that test, but unfortunately read_block_size doesn't make any sense in case sendfile() is used.

But the next test, test the same behavior with chunked response, so I think it's okish to remove it.

@client.read_block_size = 2
body = []
_res = @client.get(serverurl + 'servlet') { |str|
body << str
}
assert_equal(2, body.size)
assert_equal("get", body.join) # Was "tt" by String object recycle...
end

def test_get_with_block_chunked_string_recycle
server = TCPServer.open('localhost', 0)
server_thread = Thread.new {
Expand Down Expand Up @@ -1922,6 +1947,14 @@ def test_tcp_keepalive

private

def with_tmp_path
path = File.join(Dir.tmpdir, 'http-client-test')
File.delete(path) if File.exists?(path)
yield path
ensure
File.delete(path) if File.exists?(path)
end

def check_query_get(query)
WEBrick::HTTPUtils.parse_query(
@client.get(serverurl + 'servlet', query).header["x-query"][0]
Expand Down